38 using Base = Statement<Derived>;
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary =
false;
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
76 std::memset(&bind_parameters_, 0,
sizeof(bind_parameters_));
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
85 }
else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
103 bind_parameters_ = *stream;
104 std::memset(stream, 0,
sizeof(*stream));
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
120 using T = std::decay_t<
decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
125 }
else if constexpr (std::is_same_v<T, IngestState>) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
132 RAISE_RESULT(error, rows, impl().ExecuteIngestImpl(state));
134 *rows_affected = rows;
137 }
else if constexpr (std::is_same_v<T, PreparedState> ||
138 std::is_same_v<T, QueryState>) {
141 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
143 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
146 *rows_affected = rows;
150 static_assert(!
sizeof(T),
"case not implemented");
156 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
160 AdbcStatusCode GetParameterSchema(
struct ArrowSchema* schema,
struct AdbcError* error) {
163 using T = std::decay_t<
decltype(state)>;
164 if constexpr (std::is_same_v<T, EmptyState>) {
165 return status::InvalidState(
166 Derived::kErrorPrefix,
167 " Cannot GetParameterSchema without setting the query")
169 }
else if constexpr (std::is_same_v<T, IngestState>) {
170 return status::InvalidState(Derived::kErrorPrefix,
171 " Cannot GetParameterSchema in bulk ingestion")
173 }
else if constexpr (std::is_same_v<T, PreparedState>) {
174 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
175 }
else if constexpr (std::is_same_v<T, QueryState>) {
176 return status::InvalidState(
177 Derived::kErrorPrefix,
178 " Cannot GetParameterSchema without calling Prepare")
181 static_assert(!
sizeof(T),
"case not implemented");
188 this->lifecycle_state_ = LifecycleState::kInitialized;
189 if (
auto status = impl().InitImpl(parent); !status.ok()) {
190 return status.ToAdbc(error);
197 [&](
auto&& state) ->
Status {
198 using T = std::decay_t<
decltype(state)>;
199 if constexpr (std::is_same_v<T, EmptyState>) {
200 return status::InvalidState(
201 Derived::kErrorPrefix,
202 " Cannot Prepare without setting the query");
203 }
else if constexpr (std::is_same_v<T, IngestState>) {
204 return status::InvalidState(
205 Derived::kErrorPrefix,
206 " Cannot Prepare without setting the query");
207 }
else if constexpr (std::is_same_v<T, PreparedState>) {
210 }
else if constexpr (std::is_same_v<T, QueryState>) {
215 static_assert(!
sizeof(T),
"case not implemented");
223 if (bind_parameters_.release) {
224 bind_parameters_.release(&bind_parameters_);
225 bind_parameters_.release =
nullptr;
227 return impl().ReleaseImpl().ToAdbc(error);
232 if (!std::holds_alternative<IngestState>(state_)) {
235 return std::get<IngestState>(state_);
238 std::string_view mode;
241 auto& state = ensure_ingest();
242 state.table_does_not_exist_ = TableDoesNotExist::kFail;
243 state.table_exists_ = TableExists::kAppend;
245 auto& state = ensure_ingest();
246 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
247 state.table_exists_ = TableExists::kFail;
249 auto& state = ensure_ingest();
250 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
251 state.table_exists_ = TableExists::kAppend;
253 auto& state = ensure_ingest();
254 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
255 state.table_exists_ = TableExists::kReplace;
257 return status::InvalidArgument(Derived::kErrorPrefix,
" Invalid ingest mode '",
258 key,
"': ", value.
Format())
264 std::string_view catalog;
266 ensure_ingest().target_catalog = catalog;
268 ensure_ingest().target_catalog = std::nullopt;
273 std::string_view schema;
275 ensure_ingest().target_schema = schema;
277 ensure_ingest().target_schema = std::nullopt;
281 std::string_view table;
283 ensure_ingest().target_table = table;
288 ensure_ingest().temporary = temporary;
291 return impl().SetOptionImpl(key, value).ToAdbc(error);
296 [&](
auto&& state) ->
Status {
297 using T = std::decay_t<
decltype(state)>;
298 if constexpr (std::is_same_v<T, EmptyState>) {
303 }
else if constexpr (std::is_same_v<T, IngestState>) {
308 }
else if constexpr (std::is_same_v<T, PreparedState>) {
313 }
else if constexpr (std::is_same_v<T, QueryState>) {
314 state.query = std::string(query);
317 static_assert(!
sizeof(T),
318 "info value type not implemented");
325 AdbcStatusCode SetSubstraitPlan(
const uint8_t* plan,
size_t length, AdbcError* error) {
329 Result<int64_t> ExecuteIngestImpl(
IngestState& state) {
330 return status::NotImplemented(Derived::kErrorPrefix,
331 " Bulk ingest is not implemented");
334 Result<int64_t> ExecuteQueryImpl(
PreparedState& state, ArrowArrayStream* stream) {
335 return status::NotImplemented(Derived::kErrorPrefix,
336 " ExecuteQuery is not implemented");
339 Result<int64_t> ExecuteQueryImpl(
QueryState& state, ArrowArrayStream* stream) {
340 return status::NotImplemented(Derived::kErrorPrefix,
341 " ExecuteQuery is not implemented");
345 return status::NotImplemented(Derived::kErrorPrefix,
346 " ExecuteQuery (update) is not implemented");
349 Result<int64_t> ExecuteUpdateImpl(
QueryState& state) {
350 return status::NotImplemented(Derived::kErrorPrefix,
351 " ExecuteQuery (update) is not implemented");
354 Status GetParameterSchemaImpl(
PreparedState& state, ArrowSchema* schema) {
355 return status::NotImplemented(Derived::kErrorPrefix,
356 " GetParameterSchema is not implemented");
359 Status InitImpl(
void* parent) {
return status::Ok(); }
362 return status::NotImplemented(Derived::kErrorPrefix,
" Prepare is not implemented");
365 Status ReleaseImpl() {
return status::Ok(); }
368 return status::NotImplemented(Derived::kErrorPrefix,
" Unknown statement option ",
369 key,
"=", value.
Format());
373 ArrowArrayStream bind_parameters_;
377 Derived& impl() {
return static_cast<Derived&
>(*this); }