Flight SQL Driver¶
Available for: C/C++, GLib/Ruby, Go, Java, Python, R
The Flight SQL Driver provides access to any database implementing a Arrow Flight SQL compatible endpoint.
Installation¶
For conda-forge users:
mamba install libadbc-driver-flightsql
go get github.com/apache/arrow-adbc/go/adbc
Add a dependency on org.apache.arrow.adbc:adbc-driver-flight-sql
.
For Maven users:
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
</dependency>
# For conda-forge
mamba install adbc-driver-flightsql
# For pip
pip install adbc_driver_flightsql
# install.packages("pak")
pak::pak("apache/arrow-adbc/r/adbcflightsql")
Usage¶
To connect to a database, supply the “uri” parameter when constructing
the AdbcDatabase
.
#include "arrow-adbc/adbc.h"
// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);
Note
For detailed examples, see Flight SQL Recipes.
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect
headers = {"foo": "bar"}
with connect(
"grpc+tls://localhost:8080",
db_kwargs={
DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
**{
f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
for k, v in headers.items()
},
}
) as conn:
pass
Recipe source: example_usage_test.go
20// Tests that use the SQLite server example.
21
22package flightsql_test
23
24import (
25 "context"
26 "database/sql"
27 "fmt"
28 "log"
29
30 "github.com/apache/arrow-adbc/go/adbc"
31 drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
32 "github.com/apache/arrow-go/v18/arrow/array"
33 "github.com/apache/arrow-go/v18/arrow/flight"
34 "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
35 sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
36 "github.com/apache/arrow-go/v18/arrow/memory"
37 _ "modernc.org/sqlite"
38)
39
40var headers = map[string]string{"foo": "bar"}
41
42func FlightSQLExample(uri string) error {
43 ctx := context.Background()
44 options := map[string]string{
45 adbc.OptionKeyURI: uri,
46 }
47
48 for k, v := range headers {
49 options[drv.OptionRPCCallHeaderPrefix+k] = v
50 }
51
52 var alloc memory.Allocator
53 drv := drv.NewDriver(alloc)
54 db, err := drv.NewDatabase(options)
55 if err != nil {
56 return fmt.Errorf("failed to open database: %s\n", err.Error())
57 }
58 defer db.Close()
59
60 cnxn, err := db.Open(ctx)
61 if err != nil {
62 return fmt.Errorf("failed to open connection: %s", err.Error())
63 }
64 defer cnxn.Close()
65
66 stmt, err := cnxn.NewStatement()
67 if err != nil {
68 return fmt.Errorf("failed to create statement: %s", err.Error())
69 }
70 defer stmt.Close()
71
72 if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
73 return fmt.Errorf("failed to set query: %s", err.Error())
74 }
75
76 reader, _, err := stmt.ExecuteQuery(ctx)
77 if err != nil {
78 return fmt.Errorf("failed to execute query: %s", err.Error())
79 }
80 defer reader.Release()
81
82 for reader.Next() {
83 arr, ok := reader.Record().Column(0).(*array.Int64)
84 if !ok {
85 return fmt.Errorf("result data was not int64")
86 }
87 for i := 0; i < arr.Len(); i++ {
88 if arr.IsNull(i) {
89 fmt.Println("theresult: NULL")
90 } else {
91 fmt.Printf("theresult: %d\n", arr.Value(i))
92 }
93 }
94 }
95
96 return nil
97}
98
99func Example() {
100 // For this example we will spawn the Flight SQL server ourselves.
101
102 // Create a new database that isn't tied to any other databases that
103 // may be in process.
104 db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
105 if err != nil {
106 log.Fatal(err)
107 }
108 defer db.Close()
109
110 srv, err := sqlite.NewSQLiteFlightSQLServer(db)
111 if err != nil {
112 log.Fatal(err)
113 }
114
115 server := flight.NewServerWithMiddleware(nil)
116 server.RegisterFlightService(flightsql.NewFlightServer(srv))
117 err = server.Init("localhost:8080")
118 if err != nil {
119 log.Fatal(err)
120 }
121
122 go func() {
123 if err := server.Serve(); err != nil {
124 log.Fatal(err)
125 }
126 }()
127
128 uri := fmt.Sprintf("grpc://%s", server.Addr().String())
129 if err := FlightSQLExample(uri); err != nil {
130 log.Printf("Error: %s\n", err.Error())
131 }
132
133 server.Shutdown()
134
135 // Output:
136 // theresult: 1
137}
Supported Features¶
The Flight SQL driver generally supports features defined in the ADBC API specification 1.0.0, as well as some additional, custom options.
Warning
The Java driver does not support all options here. See issue #745.
Authentication¶
The driver does no authentication by default. The driver implements a few optional authentication schemes:
Mutual TLS (mTLS): see “Client Options” below.
An HTTP-style scheme mimicking the Arrow Flight SQL JDBC driver.
Set the options
username
andpassword
on theAdbcDatabase
. Alternatively, set the optionadbc.flight.sql.authorization_header
for full control.The client provides credentials sending an
authorization
from client to server. The server then responds with anauthorization
header on the first request. The value of this header will then be sent back as theauthorization
header on all future requests.
Bulk Ingestion¶
Flight SQL does not have a dedicated API for bulk ingestion of Arrow data into a given table. The driver does not currently implement bulk ingestion as a result.
Client Options¶
The options used for creating the Flight RPC client can be customized.
Note
Many of these options simply wrap a gRPC option. For more details about what these options do, consult the gRPC documentation.
adbc.flight.sql.client_option.authority
Override gRPC’s
:authority
pseudo-header.adbc.flight.sql.client_option.mtls_cert_chain
The certificate chain to use for mTLS.
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN
adbc.flight.sql.client_option.mtls_private_key
The private key to use for mTLS.
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY
adbc.flight.sql.client_option.tls_override_hostname
Override the hostname used to verify the server’s TLS certificate.
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME
adbc.flight.sql.client_option.tls_root_certs
Override the root certificates used to validate the server’s TLS certificate.
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS
adbc.flight.sql.client_option.tls_skip_verify
Disable verification of the server’s TLS certificate. Value should be
true
orfalse
.Python:
adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY
adbc.flight.sql.client_option.with_block
Warning
This option is deprecated as gRPC itself has deprecated the underlying option.
This option has no effect and will be removed in a future release. Value should be
true
orfalse
.adbc.flight.sql.client_option.with_max_msg_size
The maximum message size to accept from the server. The driver defaults to 16 MiB since Flight services tend to return larger reponse payloads. Should be a positive integer number of bytes.
Python:
adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE
adbc.flight.sql.authorization_header
Directly specify the value of the
authorization
header to send on all requests.Python:
adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER
adbc.flight.sql.rpc.with_cookie_middleware
Enable or disable middleware that processes and handles “set-cookie” metadata headers returned from the server and sends “Cookie” headers back from the client. Value should be
true
orfalse
. Default isfalse
.Python:
adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE
Custom Call Headers¶
Custom HTTP headers can be attached to requests via options that apply
to AdbcDatabase
, AdbcConnection
, and
AdbcStatement
.
adbc.flight.sql.rpc.call_header.<HEADER NAME>
Add the header
<HEADER NAME>
to outgoing requests with the given value.Warning
Header names must be in all lowercase.
Distributed Result Sets¶
The driver will fetch all partitions (FlightEndpoints) returned by the server, in an unspecified order (note that Flight SQL itself does not define an ordering on these partitions). If an endpoint has no locations, the data will be fetched using the original server connection. Else, the driver will try each location given, in order, until a request succeeds. If the connection or request fails, it will try the next location.
The driver does not currently cache or pool these secondary connections. It also does not retry connections or requests.
All partitions are fetched in parallel. A limited number of batches are queued per partition. Data is returned to the client in the order of the partitions.
Some behavior can be configured on the AdbcStatement
:
adbc.rpc.result_queue_size
The number of batches to queue per partition. Defaults to 5.
Incremental Execution¶
By setting ADBC_STATEMENT_OPTION_INCREMENTAL
, you can use
nonblocking execution with this driver. This changes the behavior of
AdbcStatementExecutePartitions()
only. When enabled,
ExecutePartitions will return every time there are new partitions (in Flight
SQL terms, when there are new FlightEndpoints) from the server, instead of
blocking until the query is complete.
Some behavior can be configured on the AdbcStatement
:
adbc.flight.sql.statement.exec.last_flight_info
Get the serialized bytes for the most recent
FlightInfo
returned by the service. This is a low-level option intended for advanced usage. It is most useful when incremental execution is enabled, for inspecting the latest server response without waiting forAdbcStatementExecutePartitions()
to return.Python:
adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO
Metadata¶
The driver currently will not populate column constraint info (foreign
keys, primary keys, etc.) in AdbcConnectionGetObjects()
.
Also, catalog filters are evaluated as simple string matches, not
LIKE
-style patterns.
Partitioned Result Sets¶
The Flight SQL driver supports ADBC’s partitioned result sets. When requested, each partition of a result set contains a serialized FlightInfo, containing one of the FlightEndpoints of the original response. Clients who may wish to introspect the partition can do so by deserializing the contained FlightInfo from the ADBC partitions. (For example, a client that wishes to distribute work across multiple workers or machines may want to try to take advantage of locality information that ADBC does not have.)
Sessions¶
The driver exposes Flight SQL session support via options on the connection. There is no explicit command to start a new session; it is expected that the server itself will manage this. (You will most likely need to enable cookie support as described above.) There is no explicit command to close a session; this is always issued when the connection is closed.
adbc.flight.sql.session.options
Get all options as a JSON blob.
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS
adbc.flight.sql.session.option.
Get or set a string/numeric session option.
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionerase.
Erase a session option.
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionbool.
Get or set a boolean session option.
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionstringlist.
Get or set a string list session option. The contents should be a serialized JSON list.
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX
Timeouts¶
By default, timeouts are not used for RPC calls. They can be set via
special options on AdbcConnection
. In general, it is
best practice to set timeouts to avoid unexpectedly getting stuck.
The options are as follows:
adbc.flight.sql.rpc.timeout_seconds.fetch
A timeout (in floating-point seconds) for any API calls that fetch data. This corresponds to Flight
DoGet
calls.For example, this controls the timeout of the underlying Flight calls that fetch more data as a result set is consumed.
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH
adbc.flight.sql.rpc.timeout_seconds.query
A timeout (in floating-point seconds) for any API calls that execute a query. This corresponds to Flight
GetFlightInfo
calls.For example, this controls the timeout of the underlying Flight calls that implement
AdbcStatementExecuteQuery()
.Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY
adbc.flight.sql.rpc.timeout_seconds.update
A timeout (in floating-point seconds) for any API calls that upload data or perform other updates.
For example, this controls the timeout of the underlying Flight calls that implement bulk ingestion, or transaction support.
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE
There is also a timeout that is set on the AdbcDatabase
:
adbc.flight.sql.rpc.timeout_seconds.connect
A timeout (in floating-point seconds) for establishing a connection. The default is 20 seconds.
Transactions¶
The driver supports transactions. It will first check the server’s
SqlInfo to determine whether this is supported. Otherwise,
transaction-related ADBC APIs will return
ADBC_STATUS_NOT_IMPLEMENTED
.