ADBC
Arrow Database Connectivity
Loading...
Searching...
No Matches
statement.h
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <cstdint>
21#include <memory>
22#include <optional>
23#include <string>
24#include <utility>
25#include <variant>
26#include <vector>
27
28#include "driver/framework/base_driver.h"
30#include "driver/framework/utility.h"
31
32namespace adbc::driver {
33
35template <typename Derived>
36class Statement : public BaseStatement<Derived> {
37 public:
38 using Base = Statement<Derived>;
39
41 enum class TableDoesNotExist {
42 kCreate,
43 kFail,
44 };
45
47 enum class TableExists {
48 kAppend,
49 kFail,
50 kReplace,
51 };
52
54 struct EmptyState {};
56 struct IngestState {
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary = false;
61 TableDoesNotExist table_does_not_exist_ = TableDoesNotExist::kCreate;
62 TableExists table_exists_ = TableExists::kFail;
63 };
64
66 std::string query;
67 };
68
69 struct QueryState {
70 std::string query;
71 };
72
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
74
75 Statement() : BaseStatement<Derived>() {
76 std::memset(&bind_parameters_, 0, sizeof(bind_parameters_));
77 }
78 ~Statement() = default;
79
80 AdbcStatusCode Bind(ArrowArray* values, ArrowSchema* schema, AdbcError* error) {
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
84 .ToAdbc(error);
85 } else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
88 .ToAdbc(error);
89 }
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
91 MakeArrayStream(schema, values, &bind_parameters_);
92 return ADBC_STATUS_OK;
93 }
94
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
99 .ToAdbc(error);
100 }
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
102 // Move stream
103 bind_parameters_ = *stream;
104 std::memset(stream, 0, sizeof(*stream));
105 return ADBC_STATUS_OK;
106 }
107
108 AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; }
109
110 AdbcStatusCode ExecutePartitions(struct ArrowSchema* schema,
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
114 }
115
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
117 AdbcError* error) {
118 return std::visit(
119 [&](auto&& state) -> AdbcStatusCode {
120 using T = std::decay_t<decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
124 .ToAdbc(error);
125 } else if constexpr (std::is_same_v<T, IngestState>) {
126 if (stream) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
129 .ToAdbc(error);
130 }
131 int64_t rows;
132 RAISE_RESULT(error, rows, impl().ExecuteIngestImpl(state));
133 if (rows_affected) {
134 *rows_affected = rows;
135 }
136 return ADBC_STATUS_OK;
137 } else if constexpr (std::is_same_v<T, PreparedState> ||
138 std::is_same_v<T, QueryState>) {
139 int64_t rows = 0;
140 if (stream) {
141 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
142 } else {
143 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
144 }
145 if (rows_affected) {
146 *rows_affected = rows;
147 }
148 return ADBC_STATUS_OK;
149 } else {
150 static_assert(!sizeof(T), "case not implemented");
151 }
152 },
153 state_);
154 }
155
156 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
158 }
159
160 AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) {
161 return std::visit(
162 [&](auto&& state) -> AdbcStatusCode {
163 using T = std::decay_t<decltype(state)>;
164 if constexpr (std::is_same_v<T, EmptyState>) {
165 return status::InvalidState(
166 Derived::kErrorPrefix,
167 " Cannot GetParameterSchema without setting the query")
168 .ToAdbc(error);
169 } else if constexpr (std::is_same_v<T, IngestState>) {
170 return status::InvalidState(Derived::kErrorPrefix,
171 " Cannot GetParameterSchema in bulk ingestion")
172 .ToAdbc(error);
173 } else if constexpr (std::is_same_v<T, PreparedState>) {
174 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
175 } else if constexpr (std::is_same_v<T, QueryState>) {
176 return status::InvalidState(
177 Derived::kErrorPrefix,
178 " Cannot GetParameterSchema without calling Prepare")
179 .ToAdbc(error);
180 } else {
181 static_assert(!sizeof(T), "case not implemented");
182 }
183 },
184 state_);
185 }
186
187 AdbcStatusCode Init(void* parent, AdbcError* error) {
188 this->lifecycle_state_ = LifecycleState::kInitialized;
189 if (auto status = impl().InitImpl(parent); !status.ok()) {
190 return status.ToAdbc(error);
191 }
192 return ObjectBase::Init(parent, error);
193 }
194
195 AdbcStatusCode Prepare(AdbcError* error) {
196 RAISE_STATUS(error, std::visit(
197 [&](auto&& state) -> Status {
198 using T = std::decay_t<decltype(state)>;
199 if constexpr (std::is_same_v<T, EmptyState>) {
200 return status::InvalidState(
201 Derived::kErrorPrefix,
202 " Cannot Prepare without setting the query");
203 } else if constexpr (std::is_same_v<T, IngestState>) {
204 return status::InvalidState(
205 Derived::kErrorPrefix,
206 " Cannot Prepare without setting the query");
207 } else if constexpr (std::is_same_v<T, PreparedState>) {
208 // No-op
209 return status::Ok();
210 } else if constexpr (std::is_same_v<T, QueryState>) {
211 UNWRAP_STATUS(impl().PrepareImpl(state));
212 state_ = PreparedState{std::move(state.query)};
213 return status::Ok();
214 } else {
215 static_assert(!sizeof(T), "case not implemented");
216 }
217 },
218 state_));
219 return ADBC_STATUS_OK;
220 }
221
223 if (bind_parameters_.release) {
224 bind_parameters_.release(&bind_parameters_);
225 bind_parameters_.release = nullptr;
226 }
227 return impl().ReleaseImpl().ToAdbc(error);
228 }
229
230 AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError* error) {
231 auto ensure_ingest = [&]() -> IngestState& {
232 if (!std::holds_alternative<IngestState>(state_)) {
233 state_ = IngestState{};
234 }
235 return std::get<IngestState>(state_);
236 };
237 if (key == ADBC_INGEST_OPTION_MODE) {
238 std::string_view mode;
239 RAISE_RESULT(error, mode, value.AsString());
240 if (mode == ADBC_INGEST_OPTION_MODE_APPEND) {
241 auto& state = ensure_ingest();
242 state.table_does_not_exist_ = TableDoesNotExist::kFail;
243 state.table_exists_ = TableExists::kAppend;
244 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE) {
245 auto& state = ensure_ingest();
246 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
247 state.table_exists_ = TableExists::kFail;
248 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE_APPEND) {
249 auto& state = ensure_ingest();
250 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
251 state.table_exists_ = TableExists::kAppend;
252 } else if (mode == ADBC_INGEST_OPTION_MODE_REPLACE) {
253 auto& state = ensure_ingest();
254 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
255 state.table_exists_ = TableExists::kReplace;
256 } else {
257 return status::InvalidArgument(Derived::kErrorPrefix, " Invalid ingest mode '",
258 key, "': ", value.Format())
259 .ToAdbc(error);
260 }
261 return ADBC_STATUS_OK;
262 } else if (key == ADBC_INGEST_OPTION_TARGET_CATALOG) {
263 if (value.has_value()) {
264 std::string_view catalog;
265 RAISE_RESULT(error, catalog, value.AsString());
266 ensure_ingest().target_catalog = catalog;
267 } else {
268 ensure_ingest().target_catalog = std::nullopt;
269 }
270 return ADBC_STATUS_OK;
271 } else if (key == ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) {
272 if (value.has_value()) {
273 std::string_view schema;
274 RAISE_RESULT(error, schema, value.AsString());
275 ensure_ingest().target_schema = schema;
276 } else {
277 ensure_ingest().target_schema = std::nullopt;
278 }
279 return ADBC_STATUS_OK;
280 } else if (key == ADBC_INGEST_OPTION_TARGET_TABLE) {
281 std::string_view table;
282 RAISE_RESULT(error, table, value.AsString());
283 ensure_ingest().target_table = table;
284 return ADBC_STATUS_OK;
285 } else if (key == ADBC_INGEST_OPTION_TEMPORARY) {
286 bool temporary;
287 RAISE_RESULT(error, temporary, value.AsBool());
288 ensure_ingest().temporary = temporary;
289 return ADBC_STATUS_OK;
290 }
291 return impl().SetOptionImpl(key, value).ToAdbc(error);
292 }
293
294 AdbcStatusCode SetSqlQuery(const char* query, AdbcError* error) {
295 RAISE_STATUS(error, std::visit(
296 [&](auto&& state) -> Status {
297 using T = std::decay_t<decltype(state)>;
298 if constexpr (std::is_same_v<T, EmptyState>) {
299 state_ = QueryState{
300 std::string(query),
301 };
302 return status::Ok();
303 } else if constexpr (std::is_same_v<T, IngestState>) {
304 state_ = QueryState{
305 std::string(query),
306 };
307 return status::Ok();
308 } else if constexpr (std::is_same_v<T, PreparedState>) {
309 state_ = QueryState{
310 std::string(query),
311 };
312 return status::Ok();
313 } else if constexpr (std::is_same_v<T, QueryState>) {
314 state.query = std::string(query);
315 return status::Ok();
316 } else {
317 static_assert(!sizeof(T),
318 "info value type not implemented");
319 }
320 },
321 state_));
322 return ADBC_STATUS_OK;
323 }
324
325 AdbcStatusCode SetSubstraitPlan(const uint8_t* plan, size_t length, AdbcError* error) {
327 }
328
329 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
330 return status::NotImplemented(Derived::kErrorPrefix,
331 " Bulk ingest is not implemented");
332 }
333
334 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
335 return status::NotImplemented(Derived::kErrorPrefix,
336 " ExecuteQuery is not implemented");
337 }
338
339 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
340 return status::NotImplemented(Derived::kErrorPrefix,
341 " ExecuteQuery is not implemented");
342 }
343
344 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
345 return status::NotImplemented(Derived::kErrorPrefix,
346 " ExecuteQuery (update) is not implemented");
347 }
348
349 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
350 return status::NotImplemented(Derived::kErrorPrefix,
351 " ExecuteQuery (update) is not implemented");
352 }
353
354 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
355 return status::NotImplemented(Derived::kErrorPrefix,
356 " GetParameterSchema is not implemented");
357 }
358
359 Status InitImpl(void* parent) { return status::Ok(); }
360
361 Status PrepareImpl(QueryState& state) {
362 return status::NotImplemented(Derived::kErrorPrefix, " Prepare is not implemented");
363 }
364
365 Status ReleaseImpl() { return status::Ok(); }
366
367 Status SetOptionImpl(std::string_view key, Option value) {
368 return status::NotImplemented(Derived::kErrorPrefix, " Unknown statement option ",
369 key, "=", value.Format());
370 }
371
372 protected:
373 ArrowArrayStream bind_parameters_;
374
375 private:
376 State state_ = State(EmptyState{});
377 Derived& impl() { return static_cast<Derived&>(*this); }
378};
379
380} // namespace adbc::driver
Definition base_driver.h:1026
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition base_driver.h:276
A typed option value wrapper. It currently does not attempt conversion (i.e., getting a double option...
Definition base_driver.h:59
Result< std::string_view > AsString() const
Get the value if it is a string.
Definition base_driver.h:127
std::string Format() const
Provide a human-readable summary of the value.
Definition base_driver.h:141
Result< bool > AsBool() const
Try to parse a string value as a boolean.
Definition base_driver.h:83
bool has_value() const
Check whether this option is set.
Definition base_driver.h:80
A base implementation of a statement.
Definition statement.h:36
AdbcStatusCode Release(AdbcError *error)
Finalize the object.
Definition statement.h:222
AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition statement.h:187
TableExists
What to do in ingestion when the table already exists.
Definition statement.h:47
TableDoesNotExist
What to do in ingestion when the table does not exist.
Definition statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
Set an option. May be called prior to InitImpl.
Definition statement.h:367
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
Set an option value.
Definition statement.h:230
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
Statement state: one of the above.
Definition statement.h:73
A wrapper around AdbcStatusCode + AdbcError.
Definition status.h:43
#define ADBC_STATUS_NOT_IMPLEMENTED
The operation is not implemented or supported.
Definition adbc.h:189
uint8_t AdbcStatusCode
Error codes for operations that may fail.
Definition adbc.h:176
#define ADBC_STATUS_OK
No error.
Definition adbc.h:179
A detailed error message for an operation.
Definition adbc.h:283
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
Create an ArrowArrayStream from a given ArrowSchema and ArrowArray.
#define ADBC_INGEST_OPTION_MODE_CREATE
Create the table and insert data; error if the table exists.
Definition adbc.h:807
#define ADBC_INGEST_OPTION_TARGET_CATALOG
The catalog of the table for bulk insert.
Definition adbc.h:828
#define ADBC_INGEST_OPTION_TEMPORARY
Use a temporary table for ingestion.
Definition adbc.h:844
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
The schema of the table for bulk insert.
Definition adbc.h:833
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
Insert data; create the table if it does not exist, or error if the table exists, but the schema does...
Definition adbc.h:823
#define ADBC_INGEST_OPTION_MODE
Whether to create (the default) or append.
Definition adbc.h:804
#define ADBC_INGEST_OPTION_MODE_REPLACE
Create the table and insert data; drop the original table if it already exists.
Definition adbc.h:817
#define ADBC_INGEST_OPTION_MODE_APPEND
Do not create the table, and insert data; error if the table does not exist (ADBC_STATUS_NOT_FOUND) o...
Definition adbc.h:812
#define ADBC_INGEST_OPTION_TARGET_TABLE
The name of the target table for a bulk insert.
Definition adbc.h:799
#define RAISE_RESULT(ERROR, LHS, RHS)
A helper to unwrap a Result in functions returning AdbcStatusCode.
Definition status.h:286
#define UNWRAP_STATUS(rhs)
A helper to unwrap a Status in functions returning Result/Status.
Definition status.h:298
#define RAISE_STATUS(ERROR, RHS)
A helper to unwrap a Status in functions returning AdbcStatusCode.
Definition status.h:290
Statement state: initialized with no set query.
Definition statement.h:54
Statement state: bulk ingestion.
Definition statement.h:56
Statement state: prepared statement.
Definition statement.h:65
Statement state: ad-hoc query.
Definition statement.h:69