28#include "driver/framework/base_driver.h" 
   30#include "driver/framework/utility.h" 
   32namespace adbc::driver {
 
   35template <
typename Derived>
 
   57    std::optional<std::string> target_catalog;
 
   58    std::optional<std::string> target_schema;
 
   59    std::optional<std::string> target_table;
 
   60    bool temporary = 
false;
 
 
   73  using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
 
   76    std::memset(&bind_parameters_, 0, 
sizeof(bind_parameters_));
 
   81    if (!values || !values->release) {
 
   82      return status::InvalidArgument(Derived::kErrorPrefix,
 
   83                                     " Bind: must provide non-NULL array")
 
   85    } 
else if (!schema || !schema->release) {
 
   86      return status::InvalidArgument(Derived::kErrorPrefix,
 
   87                                     " Bind: must provide non-NULL stream")
 
   90    if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
 
   96    if (!stream || !stream->release) {
 
   97      return status::InvalidArgument(Derived::kErrorPrefix,
 
   98                                     " BindStream: must provide non-NULL stream")
 
  101    if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
 
  103    bind_parameters_ = *stream;
 
  104    std::memset(stream, 0, 
sizeof(*stream));
 
  112                                   int64_t* rows_affected, 
AdbcError* error) {
 
  116  AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
 
  120          using T = std::decay_t<
decltype(state)>;
 
  121          if constexpr (std::is_same_v<T, EmptyState>) {
 
  122            return status::InvalidState(Derived::kErrorPrefix,
 
  123                                        " Cannot ExecuteQuery without setting the query")
 
  125          } 
else if constexpr (std::is_same_v<T, IngestState>) {
 
  127              return status::InvalidState(Derived::kErrorPrefix,
 
  128                                          " Cannot ingest with result set")
 
  131            RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
 
  133              *rows_affected = rows;
 
  136          } 
else if constexpr (std::is_same_v<T, PreparedState> ||
 
  137                               std::is_same_v<T, QueryState>) {
 
  140              RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
 
  142              RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
 
  145              *rows_affected = rows;
 
  149            static_assert(!
sizeof(T), 
"case not implemented");
 
  162          using T = std::decay_t<
decltype(state)>;
 
  163          if constexpr (std::is_same_v<T, EmptyState>) {
 
  164            return status::InvalidState(
 
  165                       Derived::kErrorPrefix,
 
  166                       " Cannot GetParameterSchema without setting the query")
 
  168          } 
else if constexpr (std::is_same_v<T, IngestState>) {
 
  169            return status::InvalidState(Derived::kErrorPrefix,
 
  170                                        " Cannot GetParameterSchema in bulk ingestion")
 
  172          } 
else if constexpr (std::is_same_v<T, PreparedState>) {
 
  173            return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
 
  174          } 
else if constexpr (std::is_same_v<T, QueryState>) {
 
  175            return status::InvalidState(
 
  176                       Derived::kErrorPrefix,
 
  177                       " Cannot GetParameterSchema without calling Prepare")
 
  180            static_assert(!
sizeof(T), 
"case not implemented");
 
  187    this->lifecycle_state_ = LifecycleState::kInitialized;
 
  188    if (
auto status = impl().InitImpl(parent); !status.
ok()) {
 
  189      return status.ToAdbc(error);
 
 
  196                            [&](
auto&& state) -> 
Status {
 
  197                              using T = std::decay_t<
decltype(state)>;
 
  198                              if constexpr (std::is_same_v<T, EmptyState>) {
 
  199                                return status::InvalidState(
 
  200                                    Derived::kErrorPrefix,
 
  201                                    " Cannot Prepare without setting the query");
 
  202                              } 
else if constexpr (std::is_same_v<T, IngestState>) {
 
  203                                return status::InvalidState(
 
  204                                    Derived::kErrorPrefix,
 
  205                                    " Cannot Prepare without setting the query");
 
  206                              } 
else if constexpr (std::is_same_v<T, PreparedState>) {
 
  209                              } 
else if constexpr (std::is_same_v<T, QueryState>) {
 
  211                                state_ = PreparedState{std::move(state.query)};
 
  214                                static_assert(!
sizeof(T), 
"case not implemented");
 
  222    if (bind_parameters_.release) {
 
  223      bind_parameters_.release(&bind_parameters_);
 
  224      bind_parameters_.release = 
nullptr;
 
  226    return impl().ReleaseImpl().ToAdbc(error);
 
 
  231      if (!std::holds_alternative<IngestState>(state_)) {
 
  234      return std::get<IngestState>(state_);
 
  239        auto& state = ensure_ingest();
 
  240        state.table_does_not_exist_ = TableDoesNotExist::kFail;
 
  241        state.table_exists_ = TableExists::kAppend;
 
  243        auto& state = ensure_ingest();
 
  244        state.table_does_not_exist_ = TableDoesNotExist::kCreate;
 
  245        state.table_exists_ = TableExists::kFail;
 
  247        auto& state = ensure_ingest();
 
  248        state.table_does_not_exist_ = TableDoesNotExist::kCreate;
 
  249        state.table_exists_ = TableExists::kAppend;
 
  251        auto& state = ensure_ingest();
 
  252        state.table_does_not_exist_ = TableDoesNotExist::kCreate;
 
  253        state.table_exists_ = TableExists::kReplace;
 
  255        return status::InvalidArgument(Derived::kErrorPrefix, 
" Invalid ingest mode '",
 
  256                                       key, 
"': ", value.
Format())
 
  263        ensure_ingest().target_catalog = catalog;
 
  265        ensure_ingest().target_catalog = std::nullopt;
 
  271        ensure_ingest().target_schema = schema;
 
  273        ensure_ingest().target_schema = std::nullopt;
 
  278      ensure_ingest().target_table = table;
 
  282      ensure_ingest().temporary = temporary;
 
  285    return impl().SetOptionImpl(key, value).ToAdbc(error);
 
 
  290                            [&](
auto&& state) -> 
Status {
 
  291                              using T = std::decay_t<
decltype(state)>;
 
  292                              if constexpr (std::is_same_v<T, EmptyState>) {
 
  297                              } 
else if constexpr (std::is_same_v<T, IngestState>) {
 
  302                              } 
else if constexpr (std::is_same_v<T, PreparedState>) {
 
  307                              } 
else if constexpr (std::is_same_v<T, QueryState>) {
 
  308                                state.query = std::string(query);
 
  311                                static_assert(!
sizeof(T),
 
  312                                              "info value type not implemented");
 
  323  Result<int64_t> ExecuteIngestImpl(IngestState& state) {
 
  324    return status::NotImplemented(Derived::kErrorPrefix,
 
  325                                  " Bulk ingest is not implemented");
 
  328  Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
 
  329    return status::NotImplemented(Derived::kErrorPrefix,
 
  330                                  " ExecuteQuery is not implemented");
 
  333  Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
 
  334    return status::NotImplemented(Derived::kErrorPrefix,
 
  335                                  " ExecuteQuery is not implemented");
 
  338  Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
 
  339    return status::NotImplemented(Derived::kErrorPrefix,
 
  340                                  " ExecuteQuery (update) is not implemented");
 
  343  Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
 
  344    return status::NotImplemented(Derived::kErrorPrefix,
 
  345                                  " ExecuteQuery (update) is not implemented");
 
  348  Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
 
  349    return status::NotImplemented(Derived::kErrorPrefix,
 
  350                                  " GetParameterSchema is not implemented");
 
  353  Status InitImpl(
void* parent) { 
return status::Ok(); }
 
  355  Status PrepareImpl(QueryState& state) {
 
  356    return status::NotImplemented(Derived::kErrorPrefix, 
" Prepare is not implemented");
 
  359  Status ReleaseImpl() { 
return status::Ok(); }
 
  362    return status::NotImplemented(Derived::kErrorPrefix, 
" Unknown statement option ",
 
  363                                  key, 
"=", value.
Format());
 
 
  367  ArrowArrayStream bind_parameters_;
 
  371  Derived& impl() { 
return static_cast<Derived&
>(*this); }
 
 
Definition base_driver.h:1022
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition base_driver.h:274
A typed option value wrapper. It currently does not attempt conversion (i.e., getting a double option...
Definition base_driver.h:59
Result< std::string_view > AsString() const
Get the value if it is a string.
Definition base_driver.h:126
std::string Format() const
Provide a human-readable summary of the value.
Definition base_driver.h:139
Result< bool > AsBool() const
Try to parse a string value as a boolean.
Definition base_driver.h:83
bool has_value() const
Check whether this option is set.
Definition base_driver.h:80
A base implementation of a statement.
Definition statement.h:36
AdbcStatusCode Release(AdbcError *error)
Finalize the object.
Definition statement.h:221
AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition statement.h:186
TableExists
What to do in ingestion when the table already exists.
Definition statement.h:47
TableDoesNotExist
What to do in ingestion when the table does not exist.
Definition statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
Set an option. May be called prior to InitImpl.
Definition statement.h:361
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
Set an option value.
Definition statement.h:229
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
Statement state: one of the above.
Definition statement.h:73
A wrapper around AdbcStatusCode + AdbcError.
Definition status.h:43
bool ok() const
Check if this is an error or not.
Definition status.h:64
#define ADBC_STATUS_NOT_IMPLEMENTED
The operation is not implemented or supported.
Definition adbc.h:187
uint8_t AdbcStatusCode
Error codes for operations that may fail.
Definition adbc.h:176
#define ADBC_STATUS_OK
No error.
Definition adbc.h:179
A detailed error message for an operation.
Definition adbc.h:269
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
Create an ArrowArrayStream from a given ArrowSchema and ArrowArray.
#define ADBC_INGEST_OPTION_MODE_CREATE
Create the table and insert data; error if the table exists.
Definition adbc.h:761
#define ADBC_INGEST_OPTION_TARGET_CATALOG
The catalog of the table for bulk insert.
Definition adbc.h:778
#define ADBC_INGEST_OPTION_TEMPORARY
Use a temporary table for ingestion.
Definition adbc.h:792
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
The schema of the table for bulk insert.
Definition adbc.h:782
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
Insert data; create the table if it does not exist, or error if the table exists, but the schema does...
Definition adbc.h:774
#define ADBC_INGEST_OPTION_MODE
Whether to create (the default) or append.
Definition adbc.h:759
#define ADBC_INGEST_OPTION_MODE_REPLACE
Create the table and insert data; drop the original table if it already exists.
Definition adbc.h:769
#define ADBC_INGEST_OPTION_MODE_APPEND
Do not create the table, and insert data; error if the table does not exist (ADBC_STATUS_NOT_FOUND) o...
Definition adbc.h:765
#define ADBC_INGEST_OPTION_TARGET_TABLE
The name of the target table for a bulk insert.
Definition adbc.h:755
The partitions of a distributed/partitioned result set.
Definition adbc.h:897
#define RAISE_RESULT(ERROR, LHS, RHS)
A helper to unwrap a Result in functions returning AdbcStatusCode.
Definition status.h:277
#define UNWRAP_STATUS(rhs)
A helper to unwrap a Status in functions returning Result/Status.
Definition status.h:286
#define RAISE_STATUS(ERROR, RHS)
A helper to unwrap a Status in functions returning AdbcStatusCode.
Definition status.h:280
Statement state: initialized with no set query.
Definition statement.h:54
Statement state: bulk ingestion.
Definition statement.h:56
Statement state: prepared statement.
Definition statement.h:65
Statement state: ad-hoc query.
Definition statement.h:69