28#include "driver/framework/base_driver.h"
30#include "driver/framework/utility.h"
32namespace adbc::driver {
35template <
typename Derived>
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary =
false;
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
76 std::memset(&bind_parameters_, 0,
sizeof(bind_parameters_));
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
85 }
else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
103 bind_parameters_ = *stream;
104 std::memset(stream, 0,
sizeof(*stream));
112 int64_t* rows_affected,
AdbcError* error) {
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
120 using T = std::decay_t<
decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
125 }
else if constexpr (std::is_same_v<T, IngestState>) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
133 *rows_affected = rows;
136 }
else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
145 *rows_affected = rows;
149 static_assert(!
sizeof(T),
"case not implemented");
162 using T = std::decay_t<
decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
168 }
else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
172 }
else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 }
else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
180 static_assert(!
sizeof(T),
"case not implemented");
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (
auto status = impl().InitImpl(parent); !status.
ok()) {
189 return status.ToAdbc(error);
196 [&](
auto&& state) ->
Status {
197 using T = std::decay_t<
decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 }
else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 }
else if constexpr (std::is_same_v<T, PreparedState>) {
209 }
else if constexpr (std::is_same_v<T, QueryState>) {
211 state_ = PreparedState{std::move(state.query)};
214 static_assert(!
sizeof(T),
"case not implemented");
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release =
nullptr;
226 return impl().ReleaseImpl().ToAdbc(error);
231 if (!std::holds_alternative<IngestState>(state_)) {
234 return std::get<IngestState>(state_);
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
255 return status::InvalidArgument(Derived::kErrorPrefix,
" Invalid ingest mode '",
256 key,
"': ", value.
Format())
263 ensure_ingest().target_catalog = catalog;
265 ensure_ingest().target_catalog = std::nullopt;
271 ensure_ingest().target_schema = schema;
273 ensure_ingest().target_schema = std::nullopt;
278 ensure_ingest().target_table = table;
282 ensure_ingest().temporary = temporary;
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
290 [&](
auto&& state) ->
Status {
291 using T = std::decay_t<
decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
297 }
else if constexpr (std::is_same_v<T, IngestState>) {
302 }
else if constexpr (std::is_same_v<T, PreparedState>) {
307 }
else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
311 static_assert(!
sizeof(T),
312 "info value type not implemented");
323 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
325 " Bulk ingest is not implemented");
328 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery is not implemented");
333 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery is not implemented");
338 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) is not implemented");
343 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) is not implemented");
348 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema is not implemented");
353 Status InitImpl(
void* parent) {
return status::Ok(); }
355 Status PrepareImpl(QueryState& state) {
356 return status::NotImplemented(Derived::kErrorPrefix,
" Prepare is not implemented");
359 Status ReleaseImpl() {
return status::Ok(); }
362 return status::NotImplemented(Derived::kErrorPrefix,
" Unknown statement option ",
363 key,
"=", value.
Format());
367 ArrowArrayStream bind_parameters_;
371 Derived& impl() {
return static_cast<Derived&
>(*this); }
Definition base_driver.h:1022
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition base_driver.h:274
A typed option value wrapper. It currently does not attempt conversion (i.e., getting a double option...
Definition base_driver.h:59
Result< std::string_view > AsString() const
Get the value if it is a string.
Definition base_driver.h:126
std::string Format() const
Provide a human-readable summary of the value.
Definition base_driver.h:139
Result< bool > AsBool() const
Try to parse a string value as a boolean.
Definition base_driver.h:83
bool has_value() const
Check whether this option is set.
Definition base_driver.h:80
A base implementation of a statement.
Definition statement.h:36
AdbcStatusCode Release(AdbcError *error)
Finalize the object.
Definition statement.h:221
AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition statement.h:186
TableExists
What to do in ingestion when the table already exists.
Definition statement.h:47
TableDoesNotExist
What to do in ingestion when the table does not exist.
Definition statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
Set an option. May be called prior to InitImpl.
Definition statement.h:361
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
Set an option value.
Definition statement.h:229
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
Statement state: one of the above.
Definition statement.h:73
A wrapper around AdbcStatusCode + AdbcError.
Definition status.h:43
bool ok() const
Check if this is an error or not.
Definition status.h:64
#define ADBC_STATUS_NOT_IMPLEMENTED
The operation is not implemented or supported.
Definition adbc.h:187
uint8_t AdbcStatusCode
Error codes for operations that may fail.
Definition adbc.h:176
#define ADBC_STATUS_OK
No error.
Definition adbc.h:179
A detailed error message for an operation.
Definition adbc.h:269
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
Create an ArrowArrayStream from a given ArrowSchema and ArrowArray.
#define ADBC_INGEST_OPTION_MODE_CREATE
Create the table and insert data; error if the table exists.
Definition adbc.h:761
#define ADBC_INGEST_OPTION_TARGET_CATALOG
The catalog of the table for bulk insert.
Definition adbc.h:778
#define ADBC_INGEST_OPTION_TEMPORARY
Use a temporary table for ingestion.
Definition adbc.h:792
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
The schema of the table for bulk insert.
Definition adbc.h:782
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
Insert data; create the table if it does not exist, or error if the table exists, but the schema does...
Definition adbc.h:774
#define ADBC_INGEST_OPTION_MODE
Whether to create (the default) or append.
Definition adbc.h:759
#define ADBC_INGEST_OPTION_MODE_REPLACE
Create the table and insert data; drop the original table if it already exists.
Definition adbc.h:769
#define ADBC_INGEST_OPTION_MODE_APPEND
Do not create the table, and insert data; error if the table does not exist (ADBC_STATUS_NOT_FOUND) o...
Definition adbc.h:765
#define ADBC_INGEST_OPTION_TARGET_TABLE
The name of the target table for a bulk insert.
Definition adbc.h:755
The partitions of a distributed/partitioned result set.
Definition adbc.h:897
#define RAISE_RESULT(ERROR, LHS, RHS)
A helper to unwrap a Result in functions returning AdbcStatusCode.
Definition status.h:277
#define UNWRAP_STATUS(rhs)
A helper to unwrap a Status in functions returning Result/Status.
Definition status.h:286
#define RAISE_STATUS(ERROR, RHS)
A helper to unwrap a Status in functions returning AdbcStatusCode.
Definition status.h:280
Statement state: initialized with no set query.
Definition statement.h:54
Statement state: bulk ingestion.
Definition statement.h:56
Statement state: prepared statement.
Definition statement.h:65
Statement state: ad-hoc query.
Definition statement.h:69