Flight SQL Driver

Available for: C/C++, GLib/Ruby, Go, Java, Python, R

The Flight SQL Driver provides access to any database implementing a Arrow Flight SQL compatible endpoint.

Installation

For conda-forge users:

mamba install libadbc-driver-flightsql
go get github.com/apache/arrow-adbc/go/adbc

Add a dependency on org.apache.arrow.adbc:adbc-driver-flight-sql.

For Maven users:

<dependency>
  <groupId>org.apache.arrow.adbc</groupId>
  <artifactId>adbc-driver-flight-sql</artifactId>
</dependency>
# For conda-forge
mamba install adbc-driver-flightsql

# For pip
pip install adbc_driver_flightsql
# install.packages("pak")
pak::pak("apache/arrow-adbc/r/adbcflightsql")

Usage

To connect to a database, supply the “uri” parameter when constructing the AdbcDatabase.

#include "arrow-adbc/adbc.h"

// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);

Note

For detailed examples, see Flight SQL Recipes.

from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect

headers = {"foo": "bar"}

with connect(
    "grpc+tls://localhost:8080",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
        DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
        **{
            f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
            for k, v in headers.items()
        },
    }
) as conn:
    pass

Recipe source: example_usage_test.go

 20// Tests that use the SQLite server example.
 21
 22package flightsql_test
 23
 24import (
 25	"context"
 26	"database/sql"
 27	"errors"
 28	"fmt"
 29	"log"
 30
 31	"github.com/apache/arrow-adbc/go/adbc"
 32	drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
 33	"github.com/apache/arrow-go/v18/arrow/array"
 34	"github.com/apache/arrow-go/v18/arrow/flight"
 35	"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
 36	sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
 37	"github.com/apache/arrow-go/v18/arrow/memory"
 38	_ "modernc.org/sqlite"
 39)
 40
 41var headers = map[string]string{"foo": "bar"}
 42
 43func FlightSQLExample(uri string) (err error) {
 44	ctx := context.Background()
 45	options := map[string]string{
 46		adbc.OptionKeyURI: uri,
 47	}
 48
 49	for k, v := range headers {
 50		options[drv.OptionRPCCallHeaderPrefix+k] = v
 51	}
 52
 53	var alloc memory.Allocator
 54	drv := drv.NewDriver(alloc)
 55	db, err := drv.NewDatabase(options)
 56	if err != nil {
 57		return fmt.Errorf("failed to open database: %s\n", err.Error())
 58	}
 59	defer func() {
 60		err = errors.Join(err, db.Close())
 61	}()
 62
 63	cnxn, err := db.Open(ctx)
 64	if err != nil {
 65		return fmt.Errorf("failed to open connection: %s", err.Error())
 66	}
 67	defer func() {
 68		err = errors.Join(err, cnxn.Close())
 69	}()
 70
 71	stmt, err := cnxn.NewStatement()
 72	if err != nil {
 73		return fmt.Errorf("failed to create statement: %s", err.Error())
 74	}
 75	defer func() {
 76		err = errors.Join(err, stmt.Close())
 77	}()
 78
 79	if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
 80		return fmt.Errorf("failed to set query: %s", err.Error())
 81	}
 82
 83	reader, _, err := stmt.ExecuteQuery(ctx)
 84	if err != nil {
 85		return fmt.Errorf("failed to execute query: %s", err.Error())
 86	}
 87	defer reader.Release()
 88
 89	for reader.Next() {
 90		arr, ok := reader.Record().Column(0).(*array.Int64)
 91		if !ok {
 92			return fmt.Errorf("result data was not int64")
 93		}
 94		for i := 0; i < arr.Len(); i++ {
 95			if arr.IsNull(i) {
 96				fmt.Println("theresult: NULL")
 97			} else {
 98				fmt.Printf("theresult: %d\n", arr.Value(i))
 99			}
100		}
101	}
102
103	return nil
104}
105
106func Example() {
107	// For this example we will spawn the Flight SQL server ourselves.
108
109	// Create a new database that isn't tied to any other databases that
110	// may be in process.
111	db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
112	if err != nil {
113		log.Fatal(err)
114	}
115	defer func() {
116		err := db.Close()
117		if err != nil {
118			log.Fatal(err)
119		}
120	}()
121
122	srv, err := sqlite.NewSQLiteFlightSQLServer(db)
123	if err != nil {
124		log.Fatal(err)
125	}
126
127	server := flight.NewServerWithMiddleware(nil)
128	server.RegisterFlightService(flightsql.NewFlightServer(srv))
129	err = server.Init("localhost:8080")
130	if err != nil {
131		log.Fatal(err)
132	}
133
134	go func() {
135		if err := server.Serve(); err != nil {
136			log.Fatal(err)
137		}
138	}()
139
140	uri := fmt.Sprintf("grpc://%s", server.Addr().String())
141	if err := FlightSQLExample(uri); err != nil {
142		log.Printf("Error: %s\n", err.Error())
143	}
144
145	server.Shutdown()
146
147	// Output:
148	// theresult: 1
149}
stdout
theresult: 1

Supported Features

The Flight SQL driver generally supports features defined in the ADBC API specification 1.0.0, as well as some additional, custom options.

Warning

The Java driver does not support all options here. See issue #745.

Authentication

The driver does no authentication by default. The driver implements a few optional authentication schemes:

  • Mutual TLS (mTLS): see “Client Options” below.

  • An HTTP-style scheme mimicking the Arrow Flight SQL JDBC driver.

    Set the options username and password on the AdbcDatabase. Alternatively, set the option adbc.flight.sql.authorization_header for full control.

    The client provides credentials sending an authorization from client to server. The server then responds with an authorization header on the first request. The value of this header will then be sent back as the authorization header on all future requests.

  • OAuth 2.0 authentication flows.

    The client provides configurations to allow client application to obtain access tokens from an authorization server. The obtained token is then used on the authorization header on all future requests.

Bulk Ingestion

Flight SQL does not have a dedicated API for bulk ingestion of Arrow data into a given table. The driver does not currently implement bulk ingestion as a result.

Client Options

The options used for creating the Flight RPC client can be customized.

Note

Many of these options simply wrap a gRPC option. For more details about what these options do, consult the gRPC documentation.

adbc.flight.sql.client_option.authority

Override gRPC’s :authority pseudo-header.

Python: adbc_driver_flightsql.DatabaseOptions.AUTHORITY

adbc.flight.sql.client_option.mtls_cert_chain

The certificate chain to use for mTLS.

Python: adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN

adbc.flight.sql.client_option.mtls_private_key

The private key to use for mTLS.

Python: adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY

adbc.flight.sql.client_option.tls_override_hostname

Override the hostname used to verify the server’s TLS certificate.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME

adbc.flight.sql.client_option.tls_root_certs

Override the root certificates used to validate the server’s TLS certificate.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS

adbc.flight.sql.client_option.tls_skip_verify

Disable verification of the server’s TLS certificate. Value should be true or false.

Python: adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY

adbc.flight.sql.client_option.with_block

Warning

This option is deprecated as gRPC itself has deprecated the underlying option.

This option has no effect and will be removed in a future release. Value should be true or false.

adbc.flight.sql.client_option.with_max_msg_size

The maximum message size to accept from the server. The driver defaults to 16 MiB since Flight services tend to return larger reponse payloads. Should be a positive integer number of bytes.

Python: adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE

adbc.flight.sql.authorization_header

Directly specify the value of the authorization header to send on all requests.

Python: adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER

adbc.flight.sql.rpc.with_cookie_middleware

Enable or disable middleware that processes and handles “set-cookie” metadata headers returned from the server and sends “Cookie” headers back from the client. Value should be true or false. Default is false.

Python: adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE

Custom Call Headers

Custom HTTP headers can be attached to requests via options that apply to AdbcDatabase, AdbcConnection, and AdbcStatement.

adbc.flight.sql.rpc.call_header.<HEADER NAME>

Add the header <HEADER NAME> to outgoing requests with the given value.

Python: adbc_driver_flightsql.ConnectionOptions.RPC_CALL_HEADER_PREFIX

Warning

Header names must be in all lowercase.

OAuth 2.0 Options

Supported configurations to obtain tokens using OAuth 2.0 authentication flows.

adbc.flight.sql.oauth.flow

Specifies the OAuth 2.0 flow type to use. Possible values: client_credentials, token_exchange

adbc.flight.sql.oauth.client_id

Unique identifier issued to the client application by the authorization server

adbc.flight.sql.oauth.client_secret

Secret associated to the client_id. Used to authenticate the client application to the authorization server

adbc.flight.sql.oauth.token_uri

The endpoint URL where the client application requests tokens from the authorization server

adbc.flight.sql.oauth.scope

Space-separated list of permissions that the client is requesting access to (e.g "read.all offline_access")

adbc.flight.sql.oauth.exchange.subject_token

The security token that the client application wants to exchange

adbc.flight.sql.oauth.exchange.subject_token_type

Identifier for the type of the subject token. Check list below for supported token types.

adbc.flight.sql.oauth.exchange.actor_token

A security token that represents the identity of the acting party

adbc.flight.sql.oauth.exchange.actor_token_type

Identifier for the type of the actor token. Check list below for supported token types.

adbc.flight.sql.oauth.exchange.aud

The intended audience for the requested security token

adbc.flight.sql.oauth.exchange.resource

The resource server where the client intends to use the requested security token

adbc.flight.sql.oauth.exchange.scope

Specific permissions requested for the new token

adbc.flight.sql.oauth.exchange.requested_token_type

The type of token the client wants to receive in exchange. Check list below for supported token types.

Supported token types:
  • urn:ietf:params:oauth:token-type:access_token

  • urn:ietf:params:oauth:token-type:refresh_token

  • urn:ietf:params:oauth:token-type:id_token

  • urn:ietf:params:oauth:token-type:saml1

  • urn:ietf:params:oauth:token-type:saml2

  • urn:ietf:params:oauth:token-type:jwt

Distributed Result Sets

The driver will fetch all partitions (FlightEndpoints) returned by the server, in an unspecified order (note that Flight SQL itself does not define an ordering on these partitions). If an endpoint has no locations, the data will be fetched using the original server connection. Else, the driver will try each location given, in order, until a request succeeds. If the connection or request fails, it will try the next location.

The driver does not currently cache or pool these secondary connections. It also does not retry connections or requests.

All partitions are fetched in parallel. A limited number of batches are queued per partition. Data is returned to the client in the order of the partitions.

Some behavior can be configured on the AdbcStatement:

adbc.rpc.result_queue_size

The number of batches to queue per partition. Defaults to 5.

Python: adbc_driver_flightsql.StatementOptions.QUEUE_SIZE

Incremental Execution

By setting ADBC_STATEMENT_OPTION_INCREMENTAL, you can use nonblocking execution with this driver. This changes the behavior of AdbcStatementExecutePartitions() only. When enabled, ExecutePartitions will return every time there are new partitions (in Flight SQL terms, when there are new FlightEndpoints) from the server, instead of blocking until the query is complete.

Some behavior can be configured on the AdbcStatement:

adbc.flight.sql.statement.exec.last_flight_info

Get the serialized bytes for the most recent FlightInfo returned by the service. This is a low-level option intended for advanced usage. It is most useful when incremental execution is enabled, for inspecting the latest server response without waiting for AdbcStatementExecutePartitions() to return.

Python: adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO

Metadata

The driver currently will not populate column constraint info (foreign keys, primary keys, etc.) in AdbcConnectionGetObjects(). Also, catalog filters are evaluated as simple string matches, not LIKE-style patterns.

Partitioned Result Sets

The Flight SQL driver supports ADBC’s partitioned result sets. When requested, each partition of a result set contains a serialized FlightInfo, containing one of the FlightEndpoints of the original response. Clients who may wish to introspect the partition can do so by deserializing the contained FlightInfo from the ADBC partitions. (For example, a client that wishes to distribute work across multiple workers or machines may want to try to take advantage of locality information that ADBC does not have.)

Sessions

The driver exposes Flight SQL session support via options on the connection. There is no explicit command to start a new session; it is expected that the server itself will manage this. (You will most likely need to enable cookie support as described above.) There is no explicit command to close a session; this is always issued when the connection is closed.

adbc.flight.sql.session.options

Get all options as a JSON blob.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS

adbc.flight.sql.session.option.

Get or set a string/numeric session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionerase.

Erase a session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionbool.

Get or set a boolean session option.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionstringlist.

Get or set a string list session option. The contents should be a serialized JSON list.

Python: adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX

Timeouts

By default, timeouts are not used for RPC calls. They can be set via special options on AdbcConnection. In general, it is best practice to set timeouts to avoid unexpectedly getting stuck. The options are as follows:

adbc.flight.sql.rpc.timeout_seconds.fetch

A timeout (in floating-point seconds) for any API calls that fetch data. This corresponds to Flight DoGet calls.

For example, this controls the timeout of the underlying Flight calls that fetch more data as a result set is consumed.

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH

adbc.flight.sql.rpc.timeout_seconds.query

A timeout (in floating-point seconds) for any API calls that execute a query. This corresponds to Flight GetFlightInfo calls.

For example, this controls the timeout of the underlying Flight calls that implement AdbcStatementExecuteQuery().

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY

adbc.flight.sql.rpc.timeout_seconds.update

A timeout (in floating-point seconds) for any API calls that upload data or perform other updates.

For example, this controls the timeout of the underlying Flight calls that implement bulk ingestion, or transaction support.

Python: adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE

There is also a timeout that is set on the AdbcDatabase:

adbc.flight.sql.rpc.timeout_seconds.connect

A timeout (in floating-point seconds) for establishing a connection. The default is 20 seconds.

Transactions

The driver supports transactions. It will first check the server’s SqlInfo to determine whether this is supported. Otherwise, transaction-related ADBC APIs will return ADBC_STATUS_NOT_IMPLEMENTED.