Flight SQL Recipes

Some of these recipes are written againt a demo Flight SQL service backed by SQLite. You can run it yourself as follows:

$ go install github.com/apache/arrow/go/v${ARROW_MAJOR_VERSION}/arrow/flight/flightsql/example/cmd/sqlite_flightsql_server@latest
$ sqlite_flightsql_server -host 0.0.0.0 -port 8080

Other recipes work using the OSS version of Dremio:

$ docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 dremio/dremio-oss

If you have the ADBC repository checked out and Docker Compose installed, you can use our configuration to run both services:

$ docker compose up --detach --wait dremio dremio-init flightsql-sqlite-test

Connect to an unsecured Flight SQL service

Recipe source: flightsql_sqlite_connect.py

To connect to an unsecured Flight SQL service, just provide the URI.

22import os
23
24import adbc_driver_flightsql.dbapi
25
26uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
27conn = adbc_driver_flightsql.dbapi.connect(uri)

We can then execute a simple query.

31with conn.cursor() as cur:
32    cur.execute("SELECT 1")
33
34    assert cur.fetchone() == (1,)
35
36conn.close()

Connect to a Flight SQL service with username and password

Recipe source: flightsql_dremio_connect.py

Dremio requires a username and password. To connect to a Flight SQL service with authentication, provide the options at connection time.

23import os
24
25import adbc_driver_flightsql.dbapi
26import adbc_driver_manager
27
28uri = os.environ["ADBC_DREMIO_FLIGHTSQL_URI"]
29username = os.environ["ADBC_DREMIO_FLIGHTSQL_USER"]
30password = os.environ["ADBC_DREMIO_FLIGHTSQL_PASS"]
31conn = adbc_driver_flightsql.dbapi.connect(
32    uri,
33    db_kwargs={
34        adbc_driver_manager.DatabaseOptions.USERNAME.value: username,
35        adbc_driver_manager.DatabaseOptions.PASSWORD.value: password,
36    },
37)

We can then execute a simple query.

41with conn.cursor() as cur:
42    cur.execute("SELECT 1")
43
44    assert cur.fetchone() == (1,)
45
46conn.close()

Set timeouts and other options

Recipe source: flightsql_sqlite_options.py

The Flight SQL driver supports various options.

22import os
23
24import adbc_driver_flightsql.dbapi
25from adbc_driver_flightsql import ConnectionOptions, DatabaseOptions
26
27uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]

We can enable cookie support, which some server implementations require.

29conn = adbc_driver_flightsql.dbapi.connect(
30    uri,
31    db_kwargs={DatabaseOptions.WITH_COOKIE_MIDDLEWARE.value: "true"},
32)

Other options are set on the connection or statement.

For example, we can add a custom header to all outgoing requests.

37custom_header = f"{ConnectionOptions.RPC_CALL_HEADER_PREFIX.value}x-custom-header"
38conn.adbc_connection.set_options(**{custom_header: "value"})

We can also set timeouts. These are in floating-point seconds.

41conn.adbc_connection.set_options(
42    **{
43        ConnectionOptions.TIMEOUT_FETCH.value: 30.0,
44        ConnectionOptions.TIMEOUT_QUERY.value: 30.0,
45        ConnectionOptions.TIMEOUT_UPDATE.value: 30.0,
46    }
47)

These options will apply to all cursors we create.

51with conn.cursor() as cur:
52    cur.execute("SELECT 1")
53
54    assert cur.fetchone() == (1,)
55
56conn.close()

Set the max gRPC message size

Recipe source: flightsql_sqlite_max_msg_size.py

By default, the Flight SQL driver limits the size of incoming/outgoing messages. You might see an error like this if those limits are exceeded:

INTERNAL: [FlightSQL] grpc: received message larger than max

These limits can be adjusted to avoid this.

27import os
28
29import adbc_driver_flightsql.dbapi
30from adbc_driver_flightsql import DatabaseOptions
31
32uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]

This query generates about 16 MiB per batch, which will trip the default limit.

37query = """
38WITH RECURSIVE generate_series(value) AS (
39  SELECT 1
40  UNION ALL
41  SELECT value + 1 FROM generate_series
42   WHERE value + 1 <= 2048
43)
44SELECT printf('%.*c', 16384, 'x') FROM generate_series
45"""

When we execute the query, we’ll get an error.

49conn = adbc_driver_flightsql.dbapi.connect(uri)
50with conn.cursor() as cur:
51    cur.execute(query)
52
53    try:
54        cur.fetchallarrow()
55    except adbc_driver_flightsql.dbapi.InternalError:
56        # This exception is expected.
57        pass
58    else:
59        assert False, "Did not raise expected exception"
60
61conn.close()

We can instead change the limit when connecting.

65conn = adbc_driver_flightsql.dbapi.connect(
66    uri,
67    db_kwargs={DatabaseOptions.WITH_MAX_MSG_SIZE.value: "2147483647"},
68)
69with conn.cursor() as cur:
70    cur.execute(query)
71
72    assert len(cur.fetchallarrow()) == 2048
73
74conn.close()