PostgreSQL Recipes

Authenticate with a username and password

Recipe source: postgresql_authenticate.py

To connect to a PostgreSQL database, the username and password must be provided in the URI. For example,

postgresql://username:password@hostname:port/dbname

See the PostgreSQL documentation for full details.

32import os
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
38
39with conn.cursor() as cur:
40    cur.execute("SELECT 1")
41    print(cur.fetchone())
42    # Output: (1,)
43
44conn.close()
stdout
(1,)

Create/append to a table from an Arrow dataset

Recipe source: postgresql_create_dataset_table.py

ADBC makes it easy to load PyArrow datasets into your datastore.

24import os
25import tempfile
26from pathlib import Path
27
28import pyarrow
29import pyarrow.csv
30import pyarrow.dataset
31import pyarrow.feather
32import pyarrow.parquet
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

41with conn.cursor() as cur:
42    cur.execute("DROP TABLE IF EXISTS csvtable")
43    cur.execute("DROP TABLE IF EXISTS ipctable")
44    cur.execute("DROP TABLE IF EXISTS pqtable")
45    cur.execute("DROP TABLE IF EXISTS csvdataset")
46    cur.execute("DROP TABLE IF EXISTS ipcdataset")
47    cur.execute("DROP TABLE IF EXISTS pqdataset")
48
49conn.commit()

Generating sample data

54tempdir = tempfile.TemporaryDirectory(
55    prefix="adbc-docs-",
56    ignore_cleanup_errors=True,
57)
58root = Path(tempdir.name)
59table = pyarrow.table(
60    [
61        [1, 1, 2],
62        ["foo", "bar", "baz"],
63    ],
64    names=["ints", "strs"],
65)

First we’ll write single files.

69csv_file = root / "example.csv"
70pyarrow.csv.write_csv(table, csv_file)
71
72ipc_file = root / "example.arrow"
73pyarrow.feather.write_feather(table, ipc_file)
74
75parquet_file = root / "example.parquet"
76pyarrow.parquet.write_table(table, parquet_file)

We’ll also generate some partitioned datasets.

 80csv_dataset = root / "csv_dataset"
 81pyarrow.dataset.write_dataset(
 82    table,
 83    csv_dataset,
 84    format="csv",
 85    partitioning=["ints"],
 86)
 87
 88ipc_dataset = root / "ipc_dataset"
 89pyarrow.dataset.write_dataset(
 90    table,
 91    ipc_dataset,
 92    format="feather",
 93    partitioning=["ints"],
 94)
 95
 96parquet_dataset = root / "parquet_dataset"
 97pyarrow.dataset.write_dataset(
 98    table,
 99    parquet_dataset,
100    format="parquet",
101    partitioning=["ints"],
102)

Loading CSV Files into PostgreSQL

We can directly pass a pyarrow.RecordBatchReader (from open_csv) to adbc_ingest. We can also pass a pyarrow.dataset.Dataset, or a pyarrow.dataset.Scanner.

112with conn.cursor() as cur:
113    reader = pyarrow.csv.open_csv(csv_file)
114    cur.adbc_ingest("csvtable", reader, mode="create")
115
116    reader = pyarrow.dataset.dataset(
117        csv_dataset,
118        format="csv",
119        partitioning=["ints"],
120    )
121    cur.adbc_ingest("csvdataset", reader, mode="create")
122
123conn.commit()
124
125with conn.cursor() as cur:
126    cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
127    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
128
129    cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
130    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Loading Arrow IPC (Feather) Files into PostgreSQL

135with conn.cursor() as cur:
136    reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)

Because of quirks in the PyArrow API, we have to read the file into memory.

139    cur.adbc_ingest("ipctable", reader.read_all(), mode="create")

The Dataset API will stream the data into memory and then into PostgreSQL, though.

143    reader = pyarrow.dataset.dataset(
144        ipc_dataset,
145        format="feather",
146        partitioning=["ints"],
147    )
148    cur.adbc_ingest("ipcdataset", reader, mode="create")
149
150conn.commit()
151
152with conn.cursor() as cur:
153    cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
154    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
155
156    cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
157    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Loading Parquet Files into PostgreSQL

162with conn.cursor() as cur:
163    reader = pyarrow.parquet.ParquetFile(parquet_file)
164    cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
165
166    reader = pyarrow.dataset.dataset(
167        parquet_dataset,
168        format="parquet",
169        partitioning=["ints"],
170    )
171    cur.adbc_ingest("pqdataset", reader, mode="create")
172
173conn.commit()
174
175with conn.cursor() as cur:
176    cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
177    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
178
179    cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
180    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Cleanup

185conn.close()
186tempdir.cleanup()

Create/append to a table from an Arrow table

Recipe source: postgresql_create_append_table.py

ADBC allows creating and appending to database tables using Arrow tables.

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

35with conn.cursor() as cur:
36    cur.execute("DROP TABLE IF EXISTS example")
37    cur.execute("DROP TABLE IF EXISTS example2")

Now we can create the table.

40with conn.cursor() as cur:
41    data = pyarrow.table(
42        [
43            [1, 2, None, 4],
44        ],
45        schema=pyarrow.schema(
46            [
47                ("ints", "int32"),
48            ]
49        ),
50    )
51    cur.adbc_ingest("example", data, mode="create")
52
53conn.commit()

After ingestion, we can fetch the result.

56with conn.cursor() as cur:
57    cur.execute("SELECT * FROM example")
58    assert cur.fetchone() == (1,)
59    assert cur.fetchone() == (2,)
60
61    cur.execute("SELECT COUNT(*) FROM example")
62    assert cur.fetchone() == (4,)

If we try to ingest again, it’ll fail, because the table already exists.

66with conn.cursor() as cur:
67    try:
68        cur.adbc_ingest("example", data, mode="create")
69    except conn.ProgrammingError:
70        pass
71    else:
72        raise RuntimeError("Should have failed!")
73
74conn.rollback()

Instead, we can append to the table.

77with conn.cursor() as cur:
78    cur.adbc_ingest("example", data, mode="append")
79
80    cur.execute("SELECT COUNT(*) FROM example")
81    assert cur.fetchone() == (8,)

We can also choose to create the table if it doesn’t exist, and otherwise append.

86with conn.cursor() as cur:
87    cur.adbc_ingest("example2", data, mode="create_append")
88
89    cur.execute("SELECT COUNT(*) FROM example2")
90    assert cur.fetchone() == (4,)
91
92    cur.adbc_ingest("example2", data, mode="create_append")
93
94    cur.execute("SELECT COUNT(*) FROM example2")
95    assert cur.fetchone() == (8,)

Finally, we can replace the table.

 99with conn.cursor() as cur:
100    cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
101
102    cur.execute("SELECT COUNT(*) FROM example")
103    assert cur.fetchone() == (2,)
104
105conn.close()

Create/append to a temporary table

Recipe source: postgresql_create_temp_table.py

ADBC allows creating and appending to temporary tables as well.

23import os
24
25import pyarrow
26
27import adbc_driver_postgresql.dbapi
28
29uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
30conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")

To create a temporary table, just specify the option “temporary”.

38data = pyarrow.table(
39    [
40        [1, 2, None, 4],
41    ],
42    schema=pyarrow.schema(
43        [
44            ("ints", "int32"),
45        ]
46    ),
47)
48
49with conn.cursor() as cur:
50    cur.adbc_ingest("example", data, mode="create", temporary=True)
51
52conn.commit()

After ingestion, we can fetch the result.

55with conn.cursor() as cur:
56    cur.execute("SELECT * FROM example")
57    assert cur.fetchone() == (1,)
58    assert cur.fetchone() == (2,)
59
60    cur.execute("SELECT COUNT(*) FROM example")
61    assert cur.fetchone() == (4,)

Temporary tables are separate from regular tables, even if they have the same name.

66with conn.cursor() as cur:
67    cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
68
69conn.commit()
70
71with conn.cursor() as cur:

Because we have two tables with the same name, we have to explicitly reference the normal temporary table here.

74    cur.execute("SELECT COUNT(*) FROM public.example")
75    assert cur.fetchone() == (2,)
76
77    cur.execute("SELECT COUNT(*) FROM example")
78    assert cur.fetchone() == (4,)
79
80conn.close()

After closing the connection, the temporary table is implicitly dropped. If we reconnect, the table won’t exist; we’ll see only the ‘normal’ table.

85with adbc_driver_postgresql.dbapi.connect(uri) as conn:
86    with conn.cursor() as cur:
87        cur.execute("SELECT COUNT(*) FROM example")
88        assert cur.fetchone() == (2,)

All the regular ingestion options apply to temporary tables, too. See Create/append to a table from an Arrow dataset for more examples.

Execute a statement with bind parameters

Recipe source: postgresql_execute_bind.py

ADBC allows using Python and Arrow values as bind parameters. Right now, the PostgreSQL driver only supports bind parameters for queries that don’t generate result sets.

26import os
27
28import pyarrow
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to test.

36with conn.cursor() as cur:
37    cur.execute("DROP TABLE IF EXISTS example")
38    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
39
40conn.commit()

We can bind Python values:

43with conn.cursor() as cur:
44    cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
45
46    cur.execute("SELECT SUM(ints) FROM example")
47    assert cur.fetchone() == (4,)

Note

If you’re used to the format-string style %s syntax that libraries like psycopg use for bind parameters, note that this is not supported—only the PostgreSQL-native $1 syntax.

We can also bind Arrow values:

54with conn.cursor() as cur:
55    data = pyarrow.record_batch(
56        [
57            [5, 6],
58            [7, 8],
59        ],
60        names=["$1", "$2"],
61    )
62    cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
63
64    cur.execute("SELECT SUM(ints) FROM example")
65    assert cur.fetchone() == (15,)
66
67conn.close()

Get the Arrow schema of a table

Recipe source: postgresql_get_table_schema.py

ADBC lets you get the schema of a table as an Arrow schema.

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create some example tables to test.

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38    cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
39    cur.execute("DROP TABLE IF EXISTS other_schema.example")
40    cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
41
42conn.commit()

By default the “active” catalog/schema are assumed.

45assert conn.adbc_get_table_schema("example") == pyarrow.schema(
46    [
47        ("ints", "int32"),
48        ("bigints", "int64"),
49    ]
50)

We can explicitly specify the PostgreSQL schema to get the Arrow schema of a table in a different namespace.

Note

In PostgreSQL, you can only query the database (catalog) that you are connected to. So we cannot specify the catalog here (or rather, there is no point in doing so).

Note that the NUMERIC column is read as a string, because PostgreSQL decimals do not map onto Arrow decimals.

61assert conn.adbc_get_table_schema(
62    "example",
63    db_schema_filter="other_schema",
64) == pyarrow.schema(
65    [
66        ("strings", "string"),
67        ("values", "int32"),
68    ]
69)
70
71conn.close()

Get the Arrow schema of a query

Recipe source: postgresql_get_query_schema.py

ADBC lets you get the schema of a result set, without executing the query.

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to test.

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
39
40expected = pyarrow.schema(
41    [
42        ("ints", "int32"),
43        ("bigints", "int64"),
44    ]
45)
46
47with conn.cursor() as cur:
48    assert cur.adbc_execute_schema("SELECT * FROM example") == expected

PostgreSQL doesn’t know the type here, so it just returns a guess.

51    assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
52        [
53            ("res", "string"),
54        ]
55    )
56
57conn.close()

List catalogs, schemas, and tables

Recipe source: postgresql_list_catalogs.py

ADBC allows listing tables, catalogs, and schemas in the database.

24import os
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to look for.

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()

The data is given as a PyArrow RecordBatchReader.

39objects = conn.adbc_get_objects(depth="all").read_all()

We’ll convert it to plain Python data for convenience.

42objects = objects.to_pylist()
43catalog = objects[0]
44assert catalog["catalog_name"] == "postgres"
45
46db_schema = catalog["catalog_db_schemas"][0]
47assert db_schema["db_schema_name"] == "public"
48
49tables = db_schema["db_schema_tables"]
50example = [table for table in tables if table["table_name"] == "example"]
51assert len(example) == 1
52example = example[0]
53
54assert example["table_columns"][0]["column_name"] == "ints"
55assert example["table_columns"][1]["column_name"] == "bigints"
56
57conn.close()

Connection pooling with SQLAlchemy

Recipe source: postgresql_pool.py

ADBC does not implement connection pooling, as this is not generally a feature of DBAPI drivers. Instead, use a third party connection pool like the one built into SQLAlchemy.

28import os
29
30import sqlalchemy.pool
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35
36source = adbc_driver_postgresql.dbapi.connect(uri)

adbc_driver_manager.dbapi.Connection.adbc_clone() opens a new connection from an existing connection, sharing internal resources where possible. For example, the PostgreSQL driver will share the internal OID cache, saving some overhead on connection.

41pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)

We can now get connections out of the pool; SQLAlchemy overrides close() to return the connection to the pool.

Note

SQLAlchemy’s wrapper does not support the context manager protocol, unlike the underlying ADBC connection.

49conn = pool.connect()
50
51assert pool.checkedin() == 0
52assert pool.checkedout() == 1
53
54with conn.cursor() as cur:
55    cur.execute("SELECT 1")
56    assert cur.fetchone() == (1,)
57
58conn.close()
59
60assert pool.checkedin() == 1
61assert pool.checkedout() == 0
62
63source.close()

Using Pandas and ADBC

Recipe source: postgresql_pandas.py

ADBC is integrated into pandas, a popular dataframe library. Pandas can use ADBC to exchange data with PostgreSQL and other databases. Compared to using SQLAlchemy or other options, using ADBC with pandas can have better performance, such as by avoiding excess conversions to and from Python objects.

30import os
31
32import pandas as pd
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll use pd.DataFrame.to_sql to create a sample table.

42data = pd.DataFrame(
43    {
44        "ints": [1, 2, None, 4],
45        "strs": ["a", "b", "c", "d"],
46    }
47)
48data.to_sql("example", conn, if_exists="replace")
49conn.commit()

After creating the table, we can pass an ADBC connection and a SQL query to pd.read_sql to get the result set as a pandas DataFrame.

55df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
56
57assert len(df) == 2
58
59conn.close()

Compared to the ADBC interface, pandas offers a more convenient and higher level API, especially for those already using pandas.

Using Polars and ADBC

Recipe source: postgresql_polars.py

ADBC can be used with Polars, a dataframe library written in Rust. As per its documentation:

If the backend supports returning Arrow data directly then this facility will be used to efficiently instantiate the DataFrame; otherwise, the DataFrame is initialised from row-wise data.

Obviously, ADBC returns Arrow data directly, making ADBC and Polars a natural fit for each other.

34import os
35
36import polars as pl
37
38uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]

We’ll use Polars to create a sample table with polars.DataFrame.write_database(). We don’t need to open an ADBC connection ourselves with Polars.

44data = pl.DataFrame(
45    {
46        "ints": [1, 2, None, 4],
47        "strs": ["a", "b", "c", "d"],
48    }
49)
50data.write_database("example", uri, engine="adbc", if_table_exists="replace")

After creating the table, we can use polars.read_database_uri() to fetch the result. Again, we can just pass the URI and tell Polars to manage ADBC for us.

56df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
57
58assert len(df) == 2