38 using Base = Statement<Derived>;
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary =
false;
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
76 std::memset(&bind_parameters_, 0,
sizeof(bind_parameters_));
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
85 }
else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
103 bind_parameters_ = *stream;
104 std::memset(stream, 0,
sizeof(*stream));
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
120 using T = std::decay_t<
decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
125 }
else if constexpr (std::is_same_v<T, IngestState>) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
133 *rows_affected = rows;
136 }
else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
145 *rows_affected = rows;
149 static_assert(!
sizeof(T),
"case not implemented");
155 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
159 AdbcStatusCode GetParameterSchema(
struct ArrowSchema* schema,
struct AdbcError* error) {
162 using T = std::decay_t<
decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
168 }
else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
172 }
else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 }
else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
180 static_assert(!
sizeof(T),
"case not implemented");
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (
auto status = impl().InitImpl(parent); !status.ok()) {
189 return status.ToAdbc(error);
196 [&](
auto&& state) ->
Status {
197 using T = std::decay_t<
decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 }
else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 }
else if constexpr (std::is_same_v<T, PreparedState>) {
209 }
else if constexpr (std::is_same_v<T, QueryState>) {
214 static_assert(!
sizeof(T),
"case not implemented");
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release =
nullptr;
226 return impl().ReleaseImpl().ToAdbc(error);
231 if (!std::holds_alternative<IngestState>(state_)) {
234 return std::get<IngestState>(state_);
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
255 return status::InvalidArgument(Derived::kErrorPrefix,
" Invalid ingest mode '",
256 key,
"': ", value.
Format())
263 ensure_ingest().target_catalog = catalog;
265 ensure_ingest().target_catalog = std::nullopt;
271 ensure_ingest().target_schema = schema;
273 ensure_ingest().target_schema = std::nullopt;
278 ensure_ingest().target_table = table;
282 ensure_ingest().temporary = temporary;
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
290 [&](
auto&& state) ->
Status {
291 using T = std::decay_t<
decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
297 }
else if constexpr (std::is_same_v<T, IngestState>) {
302 }
else if constexpr (std::is_same_v<T, PreparedState>) {
307 }
else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
311 static_assert(!
sizeof(T),
312 "info value type not implemented");
319 AdbcStatusCode SetSubstraitPlan(
const uint8_t* plan,
size_t length, AdbcError* error) {
323 Result<int64_t> ExecuteIngestImpl(
IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
325 " Bulk ingest is not implemented");
328 Result<int64_t> ExecuteQueryImpl(
PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery is not implemented");
333 Result<int64_t> ExecuteQueryImpl(
QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery is not implemented");
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) is not implemented");
343 Result<int64_t> ExecuteUpdateImpl(
QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) is not implemented");
348 Status GetParameterSchemaImpl(
PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema is not implemented");
353 Status InitImpl(
void* parent) {
return status::Ok(); }
356 return status::NotImplemented(Derived::kErrorPrefix,
" Prepare is not implemented");
359 Status ReleaseImpl() {
return status::Ok(); }
362 return status::NotImplemented(Derived::kErrorPrefix,
" Unknown statement option ",
363 key,
"=", value.
Format());
367 ArrowArrayStream bind_parameters_;
371 Derived& impl() {
return static_cast<Derived&
>(*this); }