Flight SQL Driver¶
Available for: C/C++, GLib/Ruby, Go, Java, Python, R
The Flight SQL Driver provides access to any database implementing a Arrow Flight SQL compatible endpoint.
Installation¶
For conda-forge users:
mamba install libadbc-driver-flightsql
go get github.com/apache/arrow-adbc/go/adbc
Add a dependency on org.apache.arrow.adbc:adbc-driver-flight-sql.
For Maven users:
<dependency>
  <groupId>org.apache.arrow.adbc</groupId>
  <artifactId>adbc-driver-flight-sql</artifactId>
</dependency>
# For conda-forge
mamba install adbc-driver-flightsql
# For pip
pip install adbc_driver_flightsql
# install.packages("pak")
pak::pak("apache/arrow-adbc/r/adbcflightsql")
Usage¶
To connect to a database, supply the “uri” parameter when constructing
the AdbcDatabase.
#include "arrow-adbc/adbc.h"
// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);
Note
For detailed examples, see Flight SQL Recipes.
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect
headers = {"foo": "bar"}
with connect(
    "grpc+tls://localhost:8080",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
        DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
        **{
            f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
            for k, v in headers.items()
        },
    }
) as conn:
    pass
Recipe source: example_usage_test.go
 20// Tests that use the SQLite server example.
 21
 22package flightsql_test
 23
 24import (
 25	"context"
 26	"database/sql"
 27	"fmt"
 28	"log"
 29
 30	"github.com/apache/arrow-adbc/go/adbc"
 31	drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
 32	"github.com/apache/arrow-go/v18/arrow/array"
 33	"github.com/apache/arrow-go/v18/arrow/flight"
 34	"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
 35	sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
 36	"github.com/apache/arrow-go/v18/arrow/memory"
 37	_ "modernc.org/sqlite"
 38)
 39
 40var headers = map[string]string{"foo": "bar"}
 41
 42func FlightSQLExample(uri string) error {
 43	ctx := context.Background()
 44	options := map[string]string{
 45		adbc.OptionKeyURI: uri,
 46	}
 47
 48	for k, v := range headers {
 49		options[drv.OptionRPCCallHeaderPrefix+k] = v
 50	}
 51
 52	var alloc memory.Allocator
 53	drv := drv.NewDriver(alloc)
 54	db, err := drv.NewDatabase(options)
 55	if err != nil {
 56		return fmt.Errorf("failed to open database: %s\n", err.Error())
 57	}
 58	defer db.Close()
 59
 60	cnxn, err := db.Open(ctx)
 61	if err != nil {
 62		return fmt.Errorf("failed to open connection: %s", err.Error())
 63	}
 64	defer cnxn.Close()
 65
 66	stmt, err := cnxn.NewStatement()
 67	if err != nil {
 68		return fmt.Errorf("failed to create statement: %s", err.Error())
 69	}
 70	defer stmt.Close()
 71
 72	if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
 73		return fmt.Errorf("failed to set query: %s", err.Error())
 74	}
 75
 76	reader, _, err := stmt.ExecuteQuery(ctx)
 77	if err != nil {
 78		return fmt.Errorf("failed to execute query: %s", err.Error())
 79	}
 80	defer reader.Release()
 81
 82	for reader.Next() {
 83		arr, ok := reader.Record().Column(0).(*array.Int64)
 84		if !ok {
 85			return fmt.Errorf("result data was not int64")
 86		}
 87		for i := 0; i < arr.Len(); i++ {
 88			if arr.IsNull(i) {
 89				fmt.Println("theresult: NULL")
 90			} else {
 91				fmt.Printf("theresult: %d\n", arr.Value(i))
 92			}
 93		}
 94	}
 95
 96	return nil
 97}
 98
 99func Example() {
100	// For this example we will spawn the Flight SQL server ourselves.
101
102	// Create a new database that isn't tied to any other databases that
103	// may be in process.
104	db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
105	if err != nil {
106		log.Fatal(err)
107	}
108	defer db.Close()
109
110	srv, err := sqlite.NewSQLiteFlightSQLServer(db)
111	if err != nil {
112		log.Fatal(err)
113	}
114
115	server := flight.NewServerWithMiddleware(nil)
116	server.RegisterFlightService(flightsql.NewFlightServer(srv))
117	err = server.Init("localhost:8080")
118	if err != nil {
119		log.Fatal(err)
120	}
121
122	go func() {
123		if err := server.Serve(); err != nil {
124			log.Fatal(err)
125		}
126	}()
127
128	uri := fmt.Sprintf("grpc://%s", server.Addr().String())
129	if err := FlightSQLExample(uri); err != nil {
130		log.Printf("Error: %s\n", err.Error())
131	}
132
133	server.Shutdown()
134
135	// Output:
136	// theresult: 1
137}
Supported Features¶
The Flight SQL driver generally supports features defined in the ADBC API specification 1.0.0, as well as some additional, custom options.
Warning
The Java driver does not support all options here. See issue #745.
Authentication¶
The driver does no authentication by default. The driver implements a few optional authentication schemes:
- Mutual TLS (mTLS): see “Client Options” below. 
- An HTTP-style scheme mimicking the Arrow Flight SQL JDBC driver. - Set the options - usernameand- passwordon the- AdbcDatabase. Alternatively, set the option- adbc.flight.sql.authorization_headerfor full control.- The client provides credentials sending an - authorizationfrom client to server. The server then responds with an- authorizationheader on the first request. The value of this header will then be sent back as the- authorizationheader on all future requests.
Bulk Ingestion¶
Flight SQL does not have a dedicated API for bulk ingestion of Arrow data into a given table. The driver does not currently implement bulk ingestion as a result.
Client Options¶
The options used for creating the Flight RPC client can be customized.
Note
Many of these options simply wrap a gRPC option. For more details about what these options do, consult the gRPC documentation.
- adbc.flight.sql.client_option.authority
- Override gRPC’s - :authoritypseudo-header.
- adbc.flight.sql.client_option.mtls_cert_chain
- The certificate chain to use for mTLS. - Python: - adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN
- adbc.flight.sql.client_option.mtls_private_key
- The private key to use for mTLS. - Python: - adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY
- adbc.flight.sql.client_option.tls_override_hostname
- Override the hostname used to verify the server’s TLS certificate. - Python: - adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME
- adbc.flight.sql.client_option.tls_root_certs
- Override the root certificates used to validate the server’s TLS certificate. - Python: - adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS
- adbc.flight.sql.client_option.tls_skip_verify
- Disable verification of the server’s TLS certificate. Value should be - trueor- false.- Python: - adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY
- adbc.flight.sql.client_option.with_block
- Warning - This option is deprecated as gRPC itself has deprecated the underlying option. - This option has no effect and will be removed in a future release. Value should be - trueor- false.
- adbc.flight.sql.client_option.with_max_msg_size
- The maximum message size to accept from the server. The driver defaults to 16 MiB since Flight services tend to return larger reponse payloads. Should be a positive integer number of bytes. - Python: - adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE
- adbc.flight.sql.authorization_header
- Directly specify the value of the - authorizationheader to send on all requests.- Python: - adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER
- adbc.flight.sql.rpc.with_cookie_middleware
- Enable or disable middleware that processes and handles “set-cookie” metadata headers returned from the server and sends “Cookie” headers back from the client. Value should be - trueor- false. Default is- false.- Python: - adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE
Custom Call Headers¶
Custom HTTP headers can be attached to requests via options that apply
to AdbcDatabase, AdbcConnection, and
AdbcStatement.
- adbc.flight.sql.rpc.call_header.<HEADER NAME>
- Add the header - <HEADER NAME>to outgoing requests with the given value.- Warning - Header names must be in all lowercase. 
Distributed Result Sets¶
The driver will fetch all partitions (FlightEndpoints) returned by the server, in an unspecified order (note that Flight SQL itself does not define an ordering on these partitions). If an endpoint has no locations, the data will be fetched using the original server connection. Else, the driver will try each location given, in order, until a request succeeds. If the connection or request fails, it will try the next location.
The driver does not currently cache or pool these secondary connections. It also does not retry connections or requests.
All partitions are fetched in parallel. A limited number of batches are queued per partition. Data is returned to the client in the order of the partitions.
Some behavior can be configured on the AdbcStatement:
- adbc.rpc.result_queue_size
- The number of batches to queue per partition. Defaults to 5. 
Incremental Execution¶
By setting ADBC_STATEMENT_OPTION_INCREMENTAL, you can use
nonblocking execution with this driver.  This changes the behavior of
AdbcStatementExecutePartitions() only.  When enabled,
ExecutePartitions will return every time there are new partitions (in Flight
SQL terms, when there are new FlightEndpoints) from the server, instead of
blocking until the query is complete.
Some behavior can be configured on the AdbcStatement:
- adbc.flight.sql.statement.exec.last_flight_info
- Get the serialized bytes for the most recent - FlightInforeturned by the service. This is a low-level option intended for advanced usage. It is most useful when incremental execution is enabled, for inspecting the latest server response without waiting for- AdbcStatementExecutePartitions()to return.- Python: - adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO
Metadata¶
The driver currently will not populate column constraint info (foreign
keys, primary keys, etc.) in AdbcConnectionGetObjects().
Also, catalog filters are evaluated as simple string matches, not
LIKE-style patterns.
Partitioned Result Sets¶
The Flight SQL driver supports ADBC’s partitioned result sets. When requested, each partition of a result set contains a serialized FlightInfo, containing one of the FlightEndpoints of the original response. Clients who may wish to introspect the partition can do so by deserializing the contained FlightInfo from the ADBC partitions. (For example, a client that wishes to distribute work across multiple workers or machines may want to try to take advantage of locality information that ADBC does not have.)
Sessions¶
The driver exposes Flight SQL session support via options on the connection. There is no explicit command to start a new session; it is expected that the server itself will manage this. (You will most likely need to enable cookie support as described above.) There is no explicit command to close a session; this is always issued when the connection is closed.
- adbc.flight.sql.session.options
- Get all options as a JSON blob. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS
- adbc.flight.sql.session.option.
- Get or set a string/numeric session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionerase.
- Erase a session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionbool.
- Get or set a boolean session option. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX
- adbc.flight.sql.session.optionstringlist.
- Get or set a string list session option. The contents should be a serialized JSON list. - Python: - adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX
Timeouts¶
By default, timeouts are not used for RPC calls.  They can be set via
special options on AdbcConnection.  In general, it is
best practice to set timeouts to avoid unexpectedly getting stuck.
The options are as follows:
- adbc.flight.sql.rpc.timeout_seconds.fetch
- A timeout (in floating-point seconds) for any API calls that fetch data. This corresponds to Flight - DoGetcalls.- For example, this controls the timeout of the underlying Flight calls that fetch more data as a result set is consumed. - Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH
- adbc.flight.sql.rpc.timeout_seconds.query
- A timeout (in floating-point seconds) for any API calls that execute a query. This corresponds to Flight - GetFlightInfocalls.- For example, this controls the timeout of the underlying Flight calls that implement - AdbcStatementExecuteQuery().- Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY
- adbc.flight.sql.rpc.timeout_seconds.update
- A timeout (in floating-point seconds) for any API calls that upload data or perform other updates. - For example, this controls the timeout of the underlying Flight calls that implement bulk ingestion, or transaction support. - Python: - adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE
There is also a timeout that is set on the AdbcDatabase:
- adbc.flight.sql.rpc.timeout_seconds.connect
- A timeout (in floating-point seconds) for establishing a connection. The default is 20 seconds. 
Transactions¶
The driver supports transactions.  It will first check the server’s
SqlInfo to determine whether this is supported.  Otherwise,
transaction-related ADBC APIs will return
ADBC_STATUS_NOT_IMPLEMENTED.