Flight SQL Recipes¶
Some of these recipes are written against a demo Flight SQL service backed by SQLite. You can run it yourself as follows:
$ go install github.com/apache/arrow-go/v${ARROW_MAJOR_VERSION}/arrow/flight/flightsql/example/cmd/sqlite_flightsql_server@latest
$ sqlite_flightsql_server -host 0.0.0.0 -port 8080
Other recipes work using the OSS version of Dremio:
$ docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 dremio/dremio-oss
If you have the ADBC repository checked out and Docker Compose installed, you can use our configuration to run both services:
$ docker compose up --detach --wait dremio dremio-init flightsql-sqlite-test
Connect to an unsecured Flight SQL service¶
Recipe source: flightsql_sqlite_connect.py
To connect to an unsecured Flight SQL service, just provide the URI.
22import os
23
24import adbc_driver_flightsql.dbapi
25
26uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
27conn = adbc_driver_flightsql.dbapi.connect(uri)
We can then execute a simple query.
31with conn.cursor() as cur:
32 cur.execute("SELECT 1")
33
34 assert cur.fetchone() == (1,)
35
36conn.close()
Connect to a Flight SQL service with username and password¶
Recipe source: flightsql_dremio_connect.py
Dremio requires a username and password. To connect to a Flight SQL service with authentication, provide the options at connection time.
25import os
26
27import adbc_driver_flightsql.dbapi
28import adbc_driver_manager
29
30uri = os.environ["ADBC_DREMIO_FLIGHTSQL_URI"]
31username = os.environ["ADBC_DREMIO_FLIGHTSQL_USER"]
32password = os.environ["ADBC_DREMIO_FLIGHTSQL_PASS"]
33conn = adbc_driver_flightsql.dbapi.connect(
34 uri,
35 db_kwargs={
36 adbc_driver_manager.DatabaseOptions.USERNAME.value: username,
37 adbc_driver_manager.DatabaseOptions.PASSWORD.value: password,
38 },
39)
We can then execute a simple query.
43with conn.cursor() as cur:
44 cur.execute("SELECT 1")
45
46 assert cur.fetchone() == (1,)
47
48conn.close()
Set timeouts and other options¶
Recipe source: flightsql_sqlite_options.py
The Flight SQL driver supports various options.
22import os
23
24import adbc_driver_flightsql.dbapi
25from adbc_driver_flightsql import ConnectionOptions, DatabaseOptions
26
27uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
We can enable cookie support, which some server implementations require.
29conn = adbc_driver_flightsql.dbapi.connect(
30 uri,
31 db_kwargs={DatabaseOptions.WITH_COOKIE_MIDDLEWARE.value: "true"},
32)
Other options are set on the connection or statement.
For example, we can add a custom header to all outgoing requests.
37custom_header = f"{ConnectionOptions.RPC_CALL_HEADER_PREFIX.value}x-custom-header"
38conn.adbc_connection.set_options(**{custom_header: "value"})
We can also set timeouts. These are in floating-point seconds.
41conn.adbc_connection.set_options(
42 **{
43 ConnectionOptions.TIMEOUT_FETCH.value: 30.0,
44 ConnectionOptions.TIMEOUT_QUERY.value: 30.0,
45 ConnectionOptions.TIMEOUT_UPDATE.value: 30.0,
46 }
47)
These options will apply to all cursors we create.
51with conn.cursor() as cur:
52 cur.execute("SELECT 1")
53
54 assert cur.fetchone() == (1,)
55
56conn.close()
Set the max gRPC message size¶
Recipe source: flightsql_sqlite_max_msg_size.py
By default, the Flight SQL driver limits the size of incoming/outgoing messages. You might see an error like this if those limits are exceeded:
INTERNAL: [FlightSQL] grpc: received message larger than max
These limits can be adjusted to avoid this.
27import os
28
29import adbc_driver_flightsql.dbapi
30from adbc_driver_flightsql import DatabaseOptions
31
32uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
This query generates about 16 MiB per batch, which will trip the default limit.
37query = """
38WITH RECURSIVE generate_series(value) AS (
39 SELECT 1
40 UNION ALL
41 SELECT value + 1 FROM generate_series
42 WHERE value + 1 <= 2048
43)
44SELECT printf('%.*c', 16384, 'x') FROM generate_series
45"""
When we execute the query, we’ll get an error.
49conn = adbc_driver_flightsql.dbapi.connect(uri)
50with conn.cursor() as cur:
51 cur.execute(query)
52
53 try:
54 cur.fetchallarrow()
55 except adbc_driver_flightsql.dbapi.InternalError:
56 # This exception is expected.
57 pass
58 else:
59 assert False, "Did not raise expected exception"
60
61conn.close()
We can instead change the limit when connecting.
65conn = adbc_driver_flightsql.dbapi.connect(
66 uri,
67 db_kwargs={DatabaseOptions.WITH_MAX_MSG_SIZE.value: "2147483647"},
68)
69with conn.cursor() as cur:
70 cur.execute(query)
71
72 assert len(cur.fetchallarrow()) == 2048
73
74conn.close()
Connect with OAuth 2.0 Client Credentials¶
Recipe source: flightsql_oauth_client_credentials.py
The Flight SQL driver supports OAuth 2.0 authentication. This example shows how to connect using the Client Credentials flow (RFC 6749), which is suitable for machine-to-machine authentication without user interaction.
24import os
25
26import adbc_driver_flightsql.dbapi
27from adbc_driver_flightsql import DatabaseOptions, OAuthFlowType
28
29uri = os.environ["ADBC_TEST_FLIGHTSQL_URI"]
30token_uri = os.environ["ADBC_OAUTH_TOKEN_URI"]
31client_id = os.environ["ADBC_OAUTH_CLIENT_ID"]
32client_secret = os.environ["ADBC_OAUTH_CLIENT_SECRET"]
Connect using OAuth 2.0 Client Credentials flow. The driver will automatically obtain and refresh access tokens.
37db_kwargs = {
38 DatabaseOptions.OAUTH_FLOW.value: OAuthFlowType.CLIENT_CREDENTIALS.value,
39 DatabaseOptions.OAUTH_TOKEN_URI.value: token_uri,
40 DatabaseOptions.OAUTH_CLIENT_ID.value: client_id,
41 DatabaseOptions.OAUTH_CLIENT_SECRET.value: client_secret,
Optionally, request specific scopes
43 # DatabaseOptions.OAUTH_SCOPE.value: "dremio.all",
44}
For testing with self-signed certificates, skip TLS verification. In production, you should provide proper TLS certificates.
48if os.environ.get("ADBC_OAUTH_SKIP_VERIFY", "true").lower() in ("1", "true"):
49 db_kwargs[DatabaseOptions.TLS_SKIP_VERIFY.value] = "true"
50
51conn = adbc_driver_flightsql.dbapi.connect(uri, db_kwargs=db_kwargs)
We can then execute queries as usual.
55with conn.cursor() as cur:
56 cur.execute("SELECT 1")
57
58 result = cur.fetchone()
59 print(result)
60
61conn.close()
Connect with OAuth 2.0 Token Exchange¶
Recipe source: flightsql_oauth_token_exchange.py
The Flight SQL driver supports OAuth 2.0 Token Exchange (RFC 8693). This allows exchanging an existing token (e.g., a JWT from an identity provider) for a new token that can be used to access the Flight SQL service.
24import os
25
26import adbc_driver_flightsql.dbapi
27from adbc_driver_flightsql import DatabaseOptions, OAuthFlowType, OAuthTokenType
28
29uri = os.environ["ADBC_TEST_FLIGHTSQL_URI"]
30token_uri = os.environ["ADBC_OAUTH_TOKEN_URI"]
This is typically a JWT or other token from your identity provider
32subject_token = os.environ["ADBC_OAUTH_SUBJECT_TOKEN"]
For testing with self-signed certificates, skip TLS verification. In production, you should provide proper TLS certificates.
36db_kwargs = {}
37if os.environ.get("ADBC_OAUTH_SKIP_VERIFY", "true").lower() in ("1", "true"):
38 db_kwargs[DatabaseOptions.TLS_SKIP_VERIFY.value] = "true"
Connect using OAuth 2.0 Token Exchange flow. The driver will exchange the subject token for an access token.
43db_kwargs.update(
44 {
45 DatabaseOptions.OAUTH_FLOW.value: OAuthFlowType.TOKEN_EXCHANGE.value,
46 DatabaseOptions.OAUTH_TOKEN_URI.value: token_uri,
47 DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKEN.value: subject_token,
Specify the type of the subject token being exchanged
49 DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.value: (
50 OAuthTokenType.JWT.value
51 ),
Optionally, specify the type of token you want to receive
53 # DatabaseOptions.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.value:
54 # OAuthTokenType.ACCESS_TOKEN.value,
Optionally, specify the intended audience
56 # DatabaseOptions.OAUTH_EXCHANGE_AUD.value: "my-service",
57 }
58)
59
60conn = adbc_driver_flightsql.dbapi.connect(uri, db_kwargs=db_kwargs)
We can then execute queries as usual.
64with conn.cursor() as cur:
65 cur.execute("SELECT 1")
66
67 result = cur.fetchone()
68 print(result)
69
70conn.close()