Driver Example

Recipe source: driver_example.cc

Here we’ll show the structure of building an ADBC driver in C++ using the ADBC driver framework library. This is the same library that ADBC uses to build its SQLite and PostgreSQL drivers and abstracts away the details of C callables and catalog/metadata functions that can be difficult to implement but are essential for efficiently leveraging the rest of the ADBC ecosystem.

At a high level, we’ll be building a driver whose “database” is a directory where each “table” in the database is a file containing an Arrow IPC stream. Tables can be written using the bulk ingest feature and tables can be read with a simple query in the form SELECT * FROM (the file).

Installation

This quickstart is actually a literate C++ file. You can clone the repository, build the sample, and follow along.

We’ll assume you’re using conda-forge for dependencies. CMake, a C++17 compiler, and the ADBC libraries are required. They can be installed as follows:

mamba install cmake compilers libadbc-driver-manager

Building

We’ll use CMake here. From a source checkout of the ADBC repository:

mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest

Building an ADBC Driver using C++

Let’s start with some includes. Notably, we’ll need the driver framework header files and nanoarrow, which we’ll use to create and consume the Arrow C data interface structures in this example driver.

72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"

Next, we’ll bring a few essential framework types into the namespace to reduce the verbosity of the implementation:

  • adbc::driver::Option : Options can be set on an ADBC database, connection, and statmenent. They can be strings, opaque binary, doubles, or integers. The Option class abstracts the details of how to get, set, and parse these values.

  • adbc::driver::Status: The Status is the ADBC driver framework’s error handling mechanism: functions with no return value that can fail return a Status. You can use UNWRAP_STATUS(some_call()) as shorthand for Status status = some_call(); if (!status.ok()) return status; to succinctly propagate errors.

  • adbc::driver::Result: The Result<T> is used as a return value for functions that on success return a value of type T and on failure communicate their error using a Status. You can use UNWRAP_RESULT(some_type value, some_call()) as shorthand for

    some_type value;
    Result<some_type> maybe_value = some_call();
    if (!maybe_value.status().ok()) {
      return maybe_value.status();
    } else {
      value = *maybe_value;
    }
    
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {

Next, we’ll provide the database implementation. The driver framework uses the Curiously Recurring Template Pattern (CRTP). The details of this are handled by the framework, but functionally this is still just overriding methods from a base class that handles the details.

Here, our database implementation will simply record the uri passed by the user. Our interpretation of this will be a file:// uri to a directory to which our IPC files should be written and/or IPC files should be read. This is the role of the database in ADBC: a shared handle to a database that potentially caches some shared state among connections, but which still allows multiple connections to execute against the database concurrently.

134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138  Status SetOptionImpl(std::string_view key, Option value) override {
139    // Handle and validate options implemented by this driver
140    if (key == "uri") {
141      UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143      if (uri.find("file://") != 0) {
144        return adbc::driver::status::InvalidArgument(
145            "[example] uri must start with 'file://'");
146      }
147
148      uri_ = uri;
149      return adbc::driver::status::Ok();
150    }
151
152    // Defer to the base implementation to handle state managed by the base
153    // class (and error for all other options).
154    return Base::SetOptionImpl(key, value);
155  }
156
157  Result<Option> GetOption(std::string_view key) override {
158    // Return the value of options implemented by this driver
159    if (key == "uri") {
160      return Option(uri_);
161    }
162
163    // Defer to the base implementation to handle state managed by the base
164    // class (and error for all other options).
165    return Base::GetOption(key);
166  }
167
168  // This is called after zero or more calls to SetOption() on
169  Status InitImpl() override {
170    if (uri_.empty()) {
171      return adbc::driver::status::InvalidArgument(
172          "[example] Must set uri to a non-empty value");
173    }
174
175    return Base::InitImpl();
176  }
177
178  // Getters for members needed by the connection and/or statement:
179  const std::string& uri() { return uri_; }
180
181 private:
182  std::string uri_;
183};

Next, we implement the connection. While the role of the database is typically to store or cache information, the role of the connection is to provide resource handles that might be expensive to obtain (e.g., negotiating authentication when connecting to a database). Because our example “database” is just a directory, we don’t need to do much in our connection in terms of resource management except to provide a way for child statements to access the database’s uri.

Another role of the connection is to provide metadata about tables, columns, statistics, and other catalog-like information a caller might want to know before issuing a query. The driver framework base classes provide helpers to implement these functions such that you can mostly implement them in terms of the C++17 standard library (as opposed to building the C-level arrays yourself).

198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202  // Get information from the database and/or store a reference if needed.
203  Status InitImpl(void* parent) {
204    auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205    uri_ = database.uri();
206    return Base::InitImpl(parent);
207  }
208
209  // Getters for members needed by the statement:
210  const std::string& uri() { return uri_; }
211
212 private:
213  std::string uri_;
214};

Next, we provide the statement implementation. The statement is where query execution is managed. Because our data source is quite literally Arrow data, we don’t have to provide a layer that manages type or value conversion. The SQLite and PostgreSQL drivers both dedicate many lines of code to implementing and testing these conversions efficiently. The nanoarrow library can be used to implement conversions in both directions and is the scope of a separate article.

223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227  // Get information from the connection and/or store a reference if needed.
228  Status InitImpl(void* parent) {
229    auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230    uri_ = connection.uri();
231    return Base::InitImpl(parent);
232  }
233
234  // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235  // using the target table as the filename.
236  Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237    std::string directory = uri_.substr(strlen("file://"));
238    std::string filename = directory + "/" + *state.target_table;
239
240    nanoarrow::ipc::UniqueOutputStream output_stream;
241    FILE* c_file = std::fopen(filename.c_str(), "wb");
242    UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243                                                        /*close_on_release*/ true));
244
245    nanoarrow::ipc::UniqueWriter writer;
246    UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248    ArrowError nanoarrow_error;
249    ArrowErrorInit(&nanoarrow_error);
250    UNWRAP_NANOARROW(nanoarrow_error, Internal,
251                     ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252                                                    &nanoarrow_error));
253
254    return -1;
255  }
256
257  // Our implementation of query execution is to accept a simple query in the form
258  // SELECT * FROM (the filename).
259  Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260    std::string prefix("SELECT * FROM ");
261    if (state.query.find(prefix) != 0) {
262      return adbc::driver::status::InvalidArgument(
263          "[example] Query must be in the form 'SELECT * FROM filename'");
264    }
265
266    std::string directory = uri_.substr(strlen("file://"));
267    std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269    nanoarrow::ipc::UniqueInputStream input_stream;
270    FILE* c_file = std::fopen(filename.c_str(), "rb");
271    UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272                                                       /*close_on_release*/ true));
273
274    UNWRAP_ERRNO(Internal,
275                 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276    return -1;
277  }
278
279  // This path is taken when the user calls Prepare() first.
280  Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281    QueryState query_state{state.query};
282    return ExecuteQueryImpl(query_state, stream);
283  }
284
285 private:
286  std::string uri_;
287};
288
289}  // namespace

Finally, we create the driver initializer function, which is what the driver manager needs to provide implementations for the Adbc**() functions that comprise the ADBC C API. The name of this function matters: this file will be built into a shared library named libdriver_example.(so|dll|dylib), so the driver manager will look for the symbol AdbcDriverExampleInit() as the default entry point when asked to load the driver "driver_example".

298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299                                                AdbcError* error) {
300  using ExampleDriver =
301      adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302                           DriverExampleStatement>;
303  return ExampleDriver::Init(version, raw_driver, error);
304}

Low-level testing

Recipe source: driver_example_test.cc

After we’ve written a sketch of the driver, the next step is to ensure that it can be loaded by the driver manager and that the database, connection, and statement instances can be initialized and released.

First, we’ll include the driver manager and googletest.

29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"

Next we’ll declare a test case for the basic lifecycle:

36TEST(DriverExample, TestLifecycle) {
37  struct AdbcError error = ADBC_ERROR_INIT;
38
39  struct AdbcDatabase database;
40  ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41  AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42  ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43            ADBC_STATUS_OK);
44  ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46  struct AdbcConnection connection;
47  ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48  ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50  struct AdbcStatement statement;
51  ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53  ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54  ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55  ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57  if (error.release) {
58    error.release(&error);
59  }
60}

Drivers that live in the apache/arrow-adbc repository can use the built-in validation library that implements a generic test suite against a fully-featured SQL database and provides utilities to test a range of inputs and outputs.

High-level testing

Recipe source: driver_example.py

After verifying the basic driver functionality, we can use the adbc_driver_manager Python package’s built-in dbapi implementation to expose a ready-to-go Pythonic database API. This is also useful for high-level testing!

First, we’ll import pathlib for a few path calculations and the adbc_driver_manager’s dbapi module:

26from pathlib import Path
27
28from adbc_driver_manager import dbapi

Next, we’ll define a connect() function that wraps dbapi.connect() with the location of the shared library we built using cmake in the previous section. For the purposes of our tutorial, this will be in the CMake build/ directory.

35def connect(uri: str):
36    build_dir = Path(__file__).parent / "build"
37    for lib in [
38        "libdriver_example.dylib",
39        "libdriver_example.so",
40        "driver_example.dll",
41    ]:
42        driver_lib = build_dir / lib
43        if driver_lib.exists():
44            return dbapi.connect(
45                driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46            )
47
48    raise RuntimeError("Can't find driver shared object")

Next, we can give our driver a go! The two pieces we implemented in the driver were the “bulk ingest” feature and “select all from”, so let’s see if it works!

53if __name__ == "__main__":
54    import os
55
56    import pyarrow
57
58    with connect(uri=Path(__file__).parent.as_uri()) as con:
59        data = pyarrow.table({"col": [1, 2, 3]})
60        with con.cursor() as cur:
61            cur.adbc_ingest("example.arrows", data, mode="create")
62
63        with con.cursor() as cur:
64            cur.execute("SELECT * FROM example.arrows")
65            print(cur.fetchall())
66
67        os.unlink(Path(__file__).parent / "example.arrows")

High-level tests can also be written in R using the adbcdrivermanager package.

library(adbcdrivermanager)

drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)

data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")