Flight SQL Driver

Available for: C/C++, GLib/Ruby, Go, Java, Python, R

The Flight SQL Driver provides access to any database implementing a Arrow Flight SQL compatible endpoint.

Installation

For conda-forge users:

mamba install libadbc-driver-flightsql
go get github.com/apache/arrow-adbc/go/adbc

Add a dependency on org.apache.arrow.adbc:adbc-driver-flight-sql.

For Maven users:

<dependency>
  <groupId>org.apache.arrow.adbc</groupId>
  <artifactId>adbc-driver-flight-sql</artifactId>
</dependency>
# For conda-forge
mamba install adbc-driver-flightsql

# For pip
pip install adbc_driver_flightsql
# install.packages("pak")
pak::pak("apache/arrow-adbc/r/adbcflightsql")

Usage

To connect to a database, supply the “uri” parameter when constructing the AdbcDatabase.

#include "arrow-adbc/adbc.h"

// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);

Note

For detailed examples, see Flight SQL Recipes.

from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect

headers = {"foo": "bar"}

with connect(
    "grpc+tls://localhost:8080",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
        DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
        **{
            f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
            for k, v in headers.items()
        },
    }
) as conn:
    pass

Recipe source: example_usage_test.go

 20// Tests that use the SQLite server example.
 21
 22package flightsql_test
 23
 24import (
 25	"context"
 26	"database/sql"
 27	"fmt"
 28	"log"
 29
 30	"github.com/apache/arrow-adbc/go/adbc"
 31	drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
 32	"github.com/apache/arrow-go/v18/arrow/array"
 33	"github.com/apache/arrow-go/v18/arrow/flight"
 34	"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
 35	sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
 36	"github.com/apache/arrow-go/v18/arrow/memory"
 37	_ "modernc.org/sqlite"
 38)
 39
 40var headers = map[string]string{"foo": "bar"}
 41
 42func FlightSQLExample(uri string) error {
 43	ctx := context.Background()
 44	options := map[string]string{
 45		adbc.OptionKeyURI: uri,
 46	}
 47
 48	for k, v := range headers {
 49		options[drv.OptionRPCCallHeaderPrefix+k] = v
 50	}
 51
 52	var alloc memory.Allocator
 53	drv := drv.NewDriver(alloc)
 54	db, err := drv.NewDatabase(options)
 55	if err != nil {
 56		return fmt.Errorf("failed to open database: %s\n", err.Error())
 57	}
 58	defer db.Close()
 59
 60	cnxn, err := db.Open(ctx)
 61	if err != nil {
 62		return fmt.Errorf("failed to open connection: %s", err.Error())
 63	}
 64	defer cnxn.Close()
 65
 66	stmt, err := cnxn.NewStatement()
 67	if err != nil {
 68		return fmt.Errorf("failed to create statement: %s", err.Error())
 69	}
 70	defer stmt.Close()
 71
 72	if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
 73		return fmt.Errorf("failed to set query: %s", err.Error())
 74	}
 75
 76	reader, _, err := stmt.ExecuteQuery(ctx)
 77	if err != nil {
 78		return fmt.Errorf("failed to execute query: %s", err.Error())
 79	}
 80	defer reader.Release()
 81
 82	for reader.Next() {
 83		arr, ok := reader.Record().Column(0).(*array.Int64)
 84		if !ok {
 85			return fmt.Errorf("result data was not int64")
 86		}
 87		for i := 0; i < arr.Len(); i++ {
 88			if arr.IsNull(i) {
 89				fmt.Println("theresult: NULL")
 90			} else {
 91				fmt.Printf("theresult: %d\n", arr.Value(i))
 92			}
 93		}
 94	}
 95
 96	return nil
 97}
 98
 99func Example() {
100	// For this example we will spawn the Flight SQL server ourselves.
101
102	// Create a new database that isn't tied to any other databases that
103	// may be in process.
104	db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
105	if err != nil {
106		log.Fatal(err)
107	}
108	defer db.Close()
109
110	srv, err := sqlite.NewSQLiteFlightSQLServer(db)
111	if err != nil {
112		log.Fatal(err)
113	}
114
115	server := flight.NewServerWithMiddleware(nil)
116	server.RegisterFlightService(flightsql.NewFlightServer(srv))
117	err = server.Init("localhost:8080")
118	if err != nil {
119		log.Fatal(err)
120	}
121
122	go func() {
123		if err := server.Serve(); err != nil {
124			log.Fatal(err)
125		}
126	}()
127
128	uri := fmt.Sprintf("grpc://%s", server.Addr().String())
129	if err := FlightSQLExample(uri); err != nil {
130		log.Printf("Error: %s\n", err.Error())
131	}
132
133	server.Shutdown()
134
135	// Output:
136	// theresult: 1
137}

Supported Features

The Flight SQL driver generally supports features defined in the ADBC API specification 1.0.0, as well as some additional, custom options.

Warning

The Java driver does not support all options here. See issue #745.

Authentication

The driver does no authentication by default. The driver implements a few optional authentication schemes:

  • Mutual TLS (mTLS): see “Client Options” below.

  • An HTTP-style scheme mimicking the Arrow Flight SQL JDBC driver.

    Set the options username and password on the AdbcDatabase. Alternatively, set the option adbc.flight.sql.authorization_header for full control.

    The client provides credentials sending an authorization from client to server. The server then responds with an authorization header on the first request. The value of this header will then be sent back as the authorization header on all future requests.

Bulk Ingestion

Flight SQL does not have a dedicated API for bulk ingestion of Arrow data into a given table. The driver does not currently implement bulk ingestion as a result.

Client Options

The options used for creating the Flight RPC client can be customized.

Note

Many of these options simply wrap a gRPC option. For more details about what these options do, consult the gRPC documentation.

adbc.flight.sql.client_option.authority

Override gRPC’s :authority pseudo-header.

Python: adbc_driver_flightsql.DatabaseOptions.AUTHORITY

adbc.flight.sql.client_option.mtls_cert_chain

The certificate chain to use for mTLS.

Python: adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN

adbc.flight.sql.client_option.mtls_private_key

The private key to use for mTLS.

Python: adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY

adbc.flight.sql.client_option.tls_override_hostname

Override the hostname used to verify the server’s TLS certificate.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME

adbc.flight.sql.client_option.tls_root_certs

Override the root certificates used to validate the server’s TLS certificate.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS

adbc.flight.sql.client_option.tls_skip_verify

Disable verification of the server’s TLS certificate. Value should be true or false.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY

adbc.flight.sql.client_option.with_block

Warning

This option is deprecated as gRPC itself has deprecated the underlying option.

This option has no effect and will be removed in a future release. Value should be true or false.

adbc.flight.sql.client_option.with_max_msg_size

The maximum message size to accept from the server. The driver defaults to 16 MiB since Flight services tend to return larger reponse payloads. Should be a positive integer number of bytes.

Python: adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE

adbc.flight.sql.authorization_header

Directly specify the value of the authorization header to send on all requests.

Python: adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER

adbc.flight.sql.rpc.with_cookie_middleware

Enable or disable middleware that processes and handles “set-cookie” metadata headers returned from the server and sends “Cookie” headers back from the client. Value should be true or false. Default is false.

Python: adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE

Custom Call Headers

Custom HTTP headers can be attached to requests via options that apply to AdbcDatabase, AdbcConnection, and AdbcStatement.

adbc.flight.sql.rpc.call_header.<HEADER NAME>

Add the header <HEADER NAME> to outgoing requests with the given value.

Warning

Header names must be in all lowercase.

Distributed Result Sets

The driver will fetch all partitions (FlightEndpoints) returned by the server, in an unspecified order (note that Flight SQL itself does not define an ordering on these partitions). If an endpoint has no locations, the data will be fetched using the original server connection. Else, the driver will try each location given, in order, until a request succeeds. If the connection or request fails, it will try the next location.

The driver does not currently cache or pool these secondary connections. It also does not retry connections or requests.

All partitions are fetched in parallel. A limited number of batches are queued per partition. Data is returned to the client in the order of the partitions.

Some behavior can be configured on the AdbcStatement:

adbc.rpc.result_queue_size

The number of batches to queue per partition. Defaults to 5.

Python: adbc_driver_flightsql.StatementOptions.QUEUE_SIZE

Incremental Execution

By setting ADBC_STATEMENT_OPTION_INCREMENTAL, you can use nonblocking execution with this driver. This changes the behavior of AdbcStatementExecutePartitions() only. When enabled, ExecutePartitions will return every time there are new partitions (in Flight SQL terms, when there are new FlightEndpoints) from the server, instead of blocking until the query is complete.

Some behavior can be configured on the AdbcStatement:

adbc.flight.sql.statement.exec.last_flight_info

Get the serialized bytes for the most recent FlightInfo returned by the service. This is a low-level option intended for advanced usage. It is most useful when incremental execution is enabled, for inspecting the latest server response without waiting for AdbcStatementExecutePartitions() to return.

Python: adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO

Metadata

The driver currently will not populate column constraint info (foreign keys, primary keys, etc.) in AdbcConnectionGetObjects(). Also, catalog filters are evaluated as simple string matches, not LIKE-style patterns.

Partitioned Result Sets

The Flight SQL driver supports ADBC’s partitioned result sets. When requested, each partition of a result set contains a serialized FlightInfo, containing one of the FlightEndpoints of the original response. Clients who may wish to introspect the partition can do so by deserializing the contained FlightInfo from the ADBC partitions. (For example, a client that wishes to distribute work across multiple workers or machines may want to try to take advantage of locality information that ADBC does not have.)

Sessions

The driver exposes Flight SQL session support via options on the connection. There is no explicit command to start a new session; it is expected that the server itself will manage this. (You will most likely need to enable cookie support as described above.) There is no explicit command to close a session; this is always issued when the connection is closed.

adbc.flight.sql.session.options

Get all options as a JSON blob.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS

adbc.flight.sql.session.option.

Get or set a string/numeric session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionerase.

Erase a session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionbool.

Get or set a boolean session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionstringlist.

Get or set a string list session option. The contents should be a serialized JSON list.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX

Timeouts

By default, timeouts are not used for RPC calls. They can be set via special options on AdbcConnection. In general, it is best practice to set timeouts to avoid unexpectedly getting stuck. The options are as follows:

adbc.flight.sql.rpc.timeout_seconds.fetch

A timeout (in floating-point seconds) for any API calls that fetch data. This corresponds to Flight DoGet calls.

For example, this controls the timeout of the underlying Flight calls that fetch more data as a result set is consumed.

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH

adbc.flight.sql.rpc.timeout_seconds.query

A timeout (in floating-point seconds) for any API calls that execute a query. This corresponds to Flight GetFlightInfo calls.

For example, this controls the timeout of the underlying Flight calls that implement AdbcStatementExecuteQuery().

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY

adbc.flight.sql.rpc.timeout_seconds.update

A timeout (in floating-point seconds) for any API calls that upload data or perform other updates.

For example, this controls the timeout of the underlying Flight calls that implement bulk ingestion, or transaction support.

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE

There is also a timeout that is set on the AdbcDatabase:

adbc.flight.sql.rpc.timeout_seconds.connect

A timeout (in floating-point seconds) for establishing a connection. The default is 20 seconds.

Transactions

The driver supports transactions. It will first check the server’s SqlInfo to determine whether this is supported. Otherwise, transaction-related ADBC APIs will return ADBC_STATUS_NOT_IMPLEMENTED.