Flight SQL Driver¶
Language: Go Status: Stable
The Flight SQL Driver provides access to any database implementing a Arrow Flight SQL compatible endpoint.
Installation¶
Install libadbc-driver-flightsql from conda-forge:
mamba install libadbc-driver-flightsql
Install the C/C++ driver, then use the Go driver manager. Requires CGO.
go get github.com/apache/arrow-adbc/go/adbc/drivermgr
Install adbc-driver-flightsql from conda-forge:
mamba install adbc-driver-flightsql
Install adbc-driver-flightsql from PyPI:
pip install adbc-driver-flightsql
Install adbcflightsql from R-multiverse:
install.packages("adbcflightsql", repos = "https://community.r-multiverse.org")
Additionally, the driver may be used from C/C++, C#, GLib, Go, R, Ruby, and Rust via the driver manager.
Usage¶
To connect to a database, supply the “uri” parameter when constructing
the AdbcDatabase.
#include "arrow-adbc/adbc.h"
// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);
Note
For detailed examples, see Flight SQL Recipes.
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect
headers = {"foo": "bar"}
with connect(
    "grpc+tls://localhost:8080",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
        DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
        **{
            f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
            for k, v in headers.items()
        },
    }
) as conn:
    pass
Recipe source: example_usage_test.go
 20// Tests that use the SQLite server example.
 21
 22package flightsql_test
 23
 24import (
 25	"context"
 26	"database/sql"
 27	"errors"
 28	"fmt"
 29	"log"
 30
 31	"github.com/apache/arrow-adbc/go/adbc"
 32	drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
 33	"github.com/apache/arrow-go/v18/arrow/array"
 34	"github.com/apache/arrow-go/v18/arrow/flight"
 35	"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
 36	sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
 37	"github.com/apache/arrow-go/v18/arrow/memory"
 38	_ "modernc.org/sqlite"
 39)
 40
 41var headers = map[string]string{"foo": "bar"}
 42
 43func FlightSQLExample(uri string) (err error) {
 44	ctx := context.Background()
 45	options := map[string]string{
 46		adbc.OptionKeyURI: uri,
 47	}
 48
 49	for k, v := range headers {
 50		options[drv.OptionRPCCallHeaderPrefix+k] = v
 51	}
 52
 53	var alloc memory.Allocator
 54	drv := drv.NewDriver(alloc)
 55	db, err := drv.NewDatabase(options)
 56	if err != nil {
 57		return fmt.Errorf("failed to open database: %s\n", err.Error())
 58	}
 59	defer func() {
 60		err = errors.Join(err, db.Close())
 61	}()
 62
 63	cnxn, err := db.Open(ctx)
 64	if err != nil {
 65		return fmt.Errorf("failed to open connection: %s", err.Error())
 66	}
 67	defer func() {
 68		err = errors.Join(err, cnxn.Close())
 69	}()
 70
 71	stmt, err := cnxn.NewStatement()
 72	if err != nil {
 73		return fmt.Errorf("failed to create statement: %s", err.Error())
 74	}
 75	defer func() {
 76		err = errors.Join(err, stmt.Close())
 77	}()
 78
 79	if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
 80		return fmt.Errorf("failed to set query: %s", err.Error())
 81	}
 82
 83	reader, _, err := stmt.ExecuteQuery(ctx)
 84	if err != nil {
 85		return fmt.Errorf("failed to execute query: %s", err.Error())
 86	}
 87	defer reader.Release()
 88
 89	for reader.Next() {
 90		arr, ok := reader.RecordBatch().Column(0).(*array.Int64)
 91		if !ok {
 92			return fmt.Errorf("result data was not int64")
 93		}
 94		for i := 0; i < arr.Len(); i++ {
 95			if arr.IsNull(i) {
 96				fmt.Println("theresult: NULL")
 97			} else {
 98				fmt.Printf("theresult: %d\n", arr.Value(i))
 99			}
100		}
101	}
102
103	return nil
104}
105
106func Example() {
107	// For this example we will spawn the Flight SQL server ourselves.
108
109	// Create a new database that isn't tied to any other databases that
110	// may be in process.
111	db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
112	if err != nil {
113		log.Fatal(err)
114	}
115	defer func() {
116		err := db.Close()
117		if err != nil {
118			log.Fatal(err)
119		}
120	}()
121
122	srv, err := sqlite.NewSQLiteFlightSQLServer(db)
123	if err != nil {
124		log.Fatal(err)
125	}
126
127	server := flight.NewServerWithMiddleware(nil)
128	server.RegisterFlightService(flightsql.NewFlightServer(srv))
129	err = server.Init("localhost:8080")
130	if err != nil {
131		log.Fatal(err)
132	}
133
134	go func() {
135		if err := server.Serve(); err != nil {
136			log.Fatal(err)
137		}
138	}()
139
140	uri := fmt.Sprintf("grpc://%s", server.Addr().String())
141	if err := FlightSQLExample(uri); err != nil {
142		log.Printf("Error: %s\n", err.Error())
143	}
144
145	server.Shutdown()
146
147	// Output:
148	// theresult: 1
149}
theresult: 1
Supported Features¶
The Flight SQL driver generally supports features defined in the ADBC API specification 1.0.0, as well as some additional, custom options.
Warning
The Java driver does not support all options here. See issue #745.
Authentication¶
The driver does no authentication by default. The driver implements a few optional authentication schemes:
- Mutual TLS (mTLS): see “Client Options” below. 
- An HTTP-style scheme mimicking the Arrow Flight SQL JDBC driver. - Set the options - usernameand- passwordon the- AdbcDatabase. Alternatively, set the option- adbc.flight.sql.authorization_headerfor full control.- The client provides credentials sending an - authorizationfrom client to server. The server then responds with an- authorizationheader on the first request. The value of this header will then be sent back as the- authorizationheader on all future requests.
- OAuth 2.0 authentication flows. - The client provides configurations to allow client application to obtain access tokens from an authorization server. The obtained token is then used on the - authorizationheader on all future requests.
Bulk Ingestion¶
Flight SQL does not have a dedicated API for bulk ingestion of Arrow data into a given table. The driver does not currently implement bulk ingestion as a result.
Client Options¶
The options used for creating the Flight RPC client can be customized.
Note
Many of these options simply wrap a gRPC option. For more details about what these options do, consult the gRPC documentation.
- adbc.flight.sql.client_option.authority
- Override gRPC’s - :authoritypseudo-header.
- adbc.flight.sql.client_option.mtls_cert_chain
- The certificate chain to use for mTLS. - Python: - adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN
- adbc.flight.sql.client_option.mtls_private_key
- The private key to use for mTLS. - Python: - adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY
- adbc.flight.sql.client_option.tls_override_hostname
- Override the hostname used to verify the server’s TLS certificate. - Python: - adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME
- adbc.flight.sql.client_option.tls_root_certs
- Override the root certificates used to validate the server’s TLS certificate. - Python: - adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS
- adbc.flight.sql.client_option.tls_skip_verify
- Disable verification of the server’s TLS certificate. Value should be - trueor- false.- Python: - adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY
- adbc.flight.sql.client_option.with_block
- Warning - This option is deprecated as gRPC itself has deprecated the underlying option. - This option has no effect and will be removed in a future release. Value should be - trueor- false.
- adbc.flight.sql.client_option.with_max_msg_size
- The maximum message size to accept from the server. The driver defaults to 16 MiB since Flight services tend to return larger response payloads. Should be a positive integer number of bytes. - Python: - adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE
- adbc.flight.sql.authorization_header
- Directly specify the value of the - authorizationheader to send on all requests.- Python: - adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER
- adbc.flight.sql.rpc.with_cookie_middleware
- Enable or disable middleware that processes and handles “set-cookie” metadata headers returned from the server and sends “Cookie” headers back from the client. Value should be - trueor- false. Default is- false.- Python: - adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE
Custom Call Headers¶
Custom HTTP headers can be attached to requests via options that apply
to AdbcDatabase, AdbcConnection, and
AdbcStatement.
- adbc.flight.sql.rpc.call_header.<HEADER NAME>
- Add the header - <HEADER NAME>to outgoing requests with the given value.- Python: - adbc_driver_flightsql.ConnectionOptions.RPC_CALL_HEADER_PREFIX- Warning - Header names must be in all lowercase. 
OAuth 2.0 Options¶
Supported configurations to obtain tokens using OAuth 2.0 authentication flows.
- adbc.flight.sql.oauth.flow
- Specifies the OAuth 2.0 flow type to use. Possible values: - client_credentials,- token_exchange
- adbc.flight.sql.oauth.client_id
- Unique identifier issued to the client application by the authorization server 
- adbc.flight.sql.oauth.client_secret
- Secret associated to the client_id. Used to authenticate the client application to the authorization server 
- adbc.flight.sql.oauth.token_uri
- The endpoint URL where the client application requests tokens from the authorization server 
- adbc.flight.sql.oauth.scope
- Space-separated list of permissions that the client is requesting access to (e.g - "read.all offline_access")
- adbc.flight.sql.oauth.exchange.subject_token
- The security token that the client application wants to exchange 
- adbc.flight.sql.oauth.exchange.subject_token_type
- Identifier for the type of the subject token. Check list below for supported token types. 
- adbc.flight.sql.oauth.exchange.actor_token
- A security token that represents the identity of the acting party 
- adbc.flight.sql.oauth.exchange.actor_token_type
- Identifier for the type of the actor token. Check list below for supported token types. 
- adbc.flight.sql.oauth.exchange.aud
- The intended audience for the requested security token 
- adbc.flight.sql.oauth.exchange.resource
- The resource server where the client intends to use the requested security token 
- adbc.flight.sql.oauth.exchange.scope
- Specific permissions requested for the new token 
- adbc.flight.sql.oauth.exchange.requested_token_type
- The type of token the client wants to receive in exchange. Check list below for supported token types. 
- Supported token types:
- urn:ietf:params:oauth:token-type:access_token
- urn:ietf:params:oauth:token-type:refresh_token
- urn:ietf:params:oauth:token-type:id_token
- urn:ietf:params:oauth:token-type:saml1
- urn:ietf:params:oauth:token-type:saml2
- urn:ietf:params:oauth:token-type:jwt
 
Distributed Result Sets¶
The driver will fetch all partitions (FlightEndpoints) returned by the server, in an unspecified order (note that Flight SQL itself does not define an ordering on these partitions). If an endpoint has no locations, the data will be fetched using the original server connection. Else, the driver will try each location given, in order, until a request succeeds. If the connection or request fails, it will try the next location.
The driver does not currently cache or pool these secondary connections. It also does not retry connections or requests.
All partitions are fetched in parallel. A limited number of batches are queued per partition. Data is returned to the client in the order of the partitions.
Some behavior can be configured on the AdbcStatement:
- adbc.rpc.result_queue_size
- The number of batches to queue per partition. Defaults to 5. 
Incremental Execution¶
By setting ADBC_STATEMENT_OPTION_INCREMENTAL, you can use
nonblocking execution with this driver.  This changes the behavior of
AdbcStatementExecutePartitions() only.  When enabled,
ExecutePartitions will return every time there are new partitions (in Flight
SQL terms, when there are new FlightEndpoints) from the server, instead of
blocking until the query is complete.
Some behavior can be configured on the AdbcStatement:
- adbc.flight.sql.statement.exec.last_flight_info
- Get the serialized bytes for the most recent - FlightInforeturned by the service. This is a low-level option intended for advanced usage. It is most useful when incremental execution is enabled, for inspecting the latest server response without waiting for- AdbcStatementExecutePartitions()to return.- Python: - adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO
Metadata¶
The driver currently will not populate column constraint info (foreign
keys, primary keys, etc.) in AdbcConnectionGetObjects().
Also, catalog filters are evaluated as simple string matches, not
LIKE-style patterns.
Partitioned Result Sets¶
The Flight SQL driver supports ADBC’s partitioned result sets. When requested, each partition of a result set contains a serialized FlightInfo, containing one of the FlightEndpoints of the original response. Clients who may wish to introspect the partition can do so by deserializing the contained FlightInfo from the ADBC partitions. (For example, a client that wishes to distribute work across multiple workers or machines may want to try to take advantage of locality information that ADBC does not have.)
Sessions¶
The driver exposes Flight SQL session support via options on the connection. There is no explicit command to start a new session; it is expected that the server itself will manage this. (You will most likely need to enable cookie support as described above.) There is no explicit command to close a session; this is always issued when the connection is closed.
- adbc.flight.sql.session.options
- Get all options as a JSON blob. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS
- adbc.flight.sql.session.option.
- Get or set a string/numeric session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionerase.
- Erase a session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionbool.
- Get or set a boolean session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionstringlist.
- Get or set a string list session option. The contents should be a serialized JSON list. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX
Timeouts¶
By default, timeouts are not used for RPC calls.  They can be set via
special options on AdbcConnection.  In general, it is
best practice to set timeouts to avoid unexpectedly getting stuck.
The options are as follows:
- adbc.flight.sql.rpc.timeout_seconds.fetch
- A timeout (in floating-point seconds) for any API calls that fetch data. This corresponds to Flight - DoGetcalls.- For example, this controls the timeout of the underlying Flight calls that fetch more data as a result set is consumed. - Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH
- adbc.flight.sql.rpc.timeout_seconds.query
- A timeout (in floating-point seconds) for any API calls that execute a query. This corresponds to Flight - GetFlightInfocalls.- For example, this controls the timeout of the underlying Flight calls that implement - AdbcStatementExecuteQuery().- Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY
- adbc.flight.sql.rpc.timeout_seconds.update
- A timeout (in floating-point seconds) for any API calls that upload data or perform other updates. - For example, this controls the timeout of the underlying Flight calls that implement bulk ingestion, or transaction support. - Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE
There is also a timeout that is set on the AdbcDatabase:
- adbc.flight.sql.rpc.timeout_seconds.connect
- A timeout (in floating-point seconds) for establishing a connection. The default is 20 seconds. 
Transactions¶
The driver supports transactions.  It will first check the server’s
SqlInfo to determine whether this is supported.  Otherwise,
transaction-related ADBC APIs will return
ADBC_STATUS_NOT_IMPLEMENTED.