PostgreSQL Recipes

Authenticate with a username and password

Recipe source: postgresql_authenticate.py

To connect to a PostgreSQL database, the username and password must be provided in the URI. For example,

postgresql://username:password@hostname:port/dbname

See the PostgreSQL documentation for full details.

30import os
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
36
37with conn.cursor() as cur:
38    cur.execute("SELECT 1")
39    assert cur.fetchone() == (1,)
40
41conn.close()

Create/append to a table from an Arrow dataset

Recipe source: postgresql_create_dataset_table.py

ADBC makes it easy to load PyArrow datasets into your datastore.

22import os
23import tempfile
24from pathlib import Path
25
26import pyarrow
27import pyarrow.csv
28import pyarrow.dataset
29import pyarrow.feather
30import pyarrow.parquet
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

39with conn.cursor() as cur:
40    cur.execute("DROP TABLE IF EXISTS csvtable")
41    cur.execute("DROP TABLE IF EXISTS ipctable")
42    cur.execute("DROP TABLE IF EXISTS pqtable")
43    cur.execute("DROP TABLE IF EXISTS csvdataset")
44    cur.execute("DROP TABLE IF EXISTS ipcdataset")
45    cur.execute("DROP TABLE IF EXISTS pqdataset")
46
47conn.commit()

Generating sample data

52tempdir = tempfile.TemporaryDirectory(
53    prefix="adbc-docs-",
54    ignore_cleanup_errors=True,
55)
56root = Path(tempdir.name)
57table = pyarrow.table(
58    [
59        [1, 1, 2],
60        ["foo", "bar", "baz"],
61    ],
62    names=["ints", "strs"],
63)

First we’ll write single files.

67csv_file = root / "example.csv"
68pyarrow.csv.write_csv(table, csv_file)
69
70ipc_file = root / "example.arrow"
71pyarrow.feather.write_feather(table, ipc_file)
72
73parquet_file = root / "example.parquet"
74pyarrow.parquet.write_table(table, parquet_file)

We’ll also generate some partitioned datasets.

 78csv_dataset = root / "csv_dataset"
 79pyarrow.dataset.write_dataset(
 80    table,
 81    csv_dataset,
 82    format="csv",
 83    partitioning=["ints"],
 84)
 85
 86ipc_dataset = root / "ipc_dataset"
 87pyarrow.dataset.write_dataset(
 88    table,
 89    ipc_dataset,
 90    format="feather",
 91    partitioning=["ints"],
 92)
 93
 94parquet_dataset = root / "parquet_dataset"
 95pyarrow.dataset.write_dataset(
 96    table,
 97    parquet_dataset,
 98    format="parquet",
 99    partitioning=["ints"],
100)

Loading CSV Files into PostgreSQL

We can directly pass a pyarrow.RecordBatchReader (from open_csv) to adbc_ingest. We can also pass a pyarrow.dataset.Dataset, or a pyarrow.dataset.Scanner.

110with conn.cursor() as cur:
111    reader = pyarrow.csv.open_csv(csv_file)
112    cur.adbc_ingest("csvtable", reader, mode="create")
113
114    reader = pyarrow.dataset.dataset(
115        csv_dataset,
116        format="csv",
117        partitioning=["ints"],
118    )
119    cur.adbc_ingest("csvdataset", reader, mode="create")
120
121conn.commit()
122
123with conn.cursor() as cur:
124    cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
125    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
126
127    cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
128    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Loading Arrow IPC (Feather) Files into PostgreSQL

133with conn.cursor() as cur:
134    reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)

Because of quirks in the PyArrow API, we have to read the file into memory.

137    cur.adbc_ingest("ipctable", reader.read_all(), mode="create")

The Dataset API will stream the data into memory and then into PostgreSQL, though.

141    reader = pyarrow.dataset.dataset(
142        ipc_dataset,
143        format="feather",
144        partitioning=["ints"],
145    )
146    cur.adbc_ingest("ipcdataset", reader, mode="create")
147
148conn.commit()
149
150with conn.cursor() as cur:
151    cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
152    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
153
154    cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
155    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Loading Parquet Files into PostgreSQL

160with conn.cursor() as cur:
161    reader = pyarrow.parquet.ParquetFile(parquet_file)
162    cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
163
164    reader = pyarrow.dataset.dataset(
165        parquet_dataset,
166        format="parquet",
167        partitioning=["ints"],
168    )
169    cur.adbc_ingest("pqdataset", reader, mode="create")
170
171conn.commit()
172
173with conn.cursor() as cur:
174    cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
175    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
176
177    cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
178    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

Cleanup

183conn.close()
184tempdir.cleanup()

Create/append to a table from an Arrow table

Recipe source: postgresql_create_append_table.py

ADBC allows creating and appending to database tables using Arrow tables.

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

33with conn.cursor() as cur:
34    cur.execute("DROP TABLE IF EXISTS example")
35    cur.execute("DROP TABLE IF EXISTS example2")

Now we can create the table.

38with conn.cursor() as cur:
39    data = pyarrow.table(
40        [
41            [1, 2, None, 4],
42        ],
43        schema=pyarrow.schema(
44            [
45                ("ints", "int32"),
46            ]
47        ),
48    )
49    cur.adbc_ingest("example", data, mode="create")
50
51conn.commit()

After ingestion, we can fetch the result.

54with conn.cursor() as cur:
55    cur.execute("SELECT * FROM example")
56    assert cur.fetchone() == (1,)
57    assert cur.fetchone() == (2,)
58
59    cur.execute("SELECT COUNT(*) FROM example")
60    assert cur.fetchone() == (4,)

If we try to ingest again, it’ll fail, because the table already exists.

64with conn.cursor() as cur:
65    try:
66        cur.adbc_ingest("example", data, mode="create")
67    except conn.ProgrammingError:
68        pass
69    else:
70        raise RuntimeError("Should have failed!")
71
72conn.rollback()

Instead, we can append to the table.

75with conn.cursor() as cur:
76    cur.adbc_ingest("example", data, mode="append")
77
78    cur.execute("SELECT COUNT(*) FROM example")
79    assert cur.fetchone() == (8,)

We can also choose to create the table if it doesn’t exist, and otherwise append.

84with conn.cursor() as cur:
85    cur.adbc_ingest("example2", data, mode="create_append")
86
87    cur.execute("SELECT COUNT(*) FROM example2")
88    assert cur.fetchone() == (4,)
89
90    cur.adbc_ingest("example2", data, mode="create_append")
91
92    cur.execute("SELECT COUNT(*) FROM example2")
93    assert cur.fetchone() == (8,)

Finally, we can replace the table.

 97with conn.cursor() as cur:
 98    cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
 99
100    cur.execute("SELECT COUNT(*) FROM example")
101    assert cur.fetchone() == (2,)
102
103conn.close()

Create/append to a temporary table

Recipe source: postgresql_create_temp_table.py

ADBC allows creating and appending to temporary tables as well.

21import os
22
23import pyarrow
24
25import adbc_driver_postgresql.dbapi
26
27uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
28conn = adbc_driver_postgresql.dbapi.connect(uri)

For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")

To create a temporary table, just specify the option “temporary”.

36data = pyarrow.table(
37    [
38        [1, 2, None, 4],
39    ],
40    schema=pyarrow.schema(
41        [
42            ("ints", "int32"),
43        ]
44    ),
45)
46
47with conn.cursor() as cur:
48    cur.adbc_ingest("example", data, mode="create", temporary=True)
49
50conn.commit()

After ingestion, we can fetch the result.

53with conn.cursor() as cur:
54    cur.execute("SELECT * FROM example")
55    assert cur.fetchone() == (1,)
56    assert cur.fetchone() == (2,)
57
58    cur.execute("SELECT COUNT(*) FROM example")
59    assert cur.fetchone() == (4,)

Temporary tables are separate from regular tables, even if they have the same name.

64with conn.cursor() as cur:
65    cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
66
67conn.commit()
68
69with conn.cursor() as cur:

Because we have two tables with the same name, we have to explicitly reference the normal temporary table here.

72    cur.execute("SELECT COUNT(*) FROM public.example")
73    assert cur.fetchone() == (2,)
74
75    cur.execute("SELECT COUNT(*) FROM example")
76    assert cur.fetchone() == (4,)
77
78conn.close()

After closing the connection, the temporary table is implicitly dropped. If we reconnect, the table won’t exist; we’ll see only the ‘normal’ table.

83with adbc_driver_postgresql.dbapi.connect(uri) as conn:
84    with conn.cursor() as cur:
85        cur.execute("SELECT COUNT(*) FROM example")
86        assert cur.fetchone() == (2,)

All the regular ingestion options apply to temporary tables, too. See Create/append to a table from an Arrow dataset for more examples.

Execute a statement with bind parameters

Recipe source: postgresql_execute_bind.py

ADBC allows using Python and Arrow values as bind parameters. Right now, the PostgreSQL driver only supports bind parameters for queries that don’t generate result sets.

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to test.

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()

We can bind Python values:

41with conn.cursor() as cur:
42    cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
43
44    cur.execute("SELECT SUM(ints) FROM example")
45    assert cur.fetchone() == (4,)

Note

If you’re used to the format-string style %s syntax that libraries like psycopg use for bind parameters, note that this is not supported—only the PostgreSQL-native $1 syntax.

We can also bind Arrow values:

52with conn.cursor() as cur:
53    data = pyarrow.record_batch(
54        [
55            [5, 6],
56            [7, 8],
57        ],
58        names=["$1", "$2"],
59    )
60    cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
61
62    cur.execute("SELECT SUM(ints) FROM example")
63    assert cur.fetchone() == (15,)
64
65conn.close()

Get the Arrow schema of a table

Recipe source: postgresql_get_table_schema.py

ADBC lets you get the schema of a table as an Arrow schema.

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create some example tables to test.

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36    cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
37    cur.execute("DROP TABLE IF EXISTS other_schema.example")
38    cur.execute("CREATE TABLE other_schema.example (strings TEXT, values NUMERIC)")
39
40conn.commit()

By default the “active” catalog/schema are assumed.

43assert conn.adbc_get_table_schema("example") == pyarrow.schema(
44    [
45        ("ints", "int32"),
46        ("bigints", "int64"),
47    ]
48)

We can explicitly specify the PostgreSQL schema to get the Arrow schema of a table in a different namespace.

Note

In PostgreSQL, you can only query the database (catalog) that you are connected to. So we cannot specify the catalog here (or rather, there is no point in doing so).

Note that the NUMERIC column is read as a string, because PostgreSQL decimals do not map onto Arrow decimals.

59assert conn.adbc_get_table_schema(
60    "example",
61    db_schema_filter="other_schema",
62) == pyarrow.schema(
63    [
64        ("strings", "string"),
65        ("values", "string"),
66    ]
67)
68
69conn.close()

Get the Arrow schema of a query

Recipe source: postgresql_get_query_schema.py

ADBC lets you get the schema of a result set, without executing the query.

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to test.

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
37
38expected = pyarrow.schema(
39    [
40        ("ints", "int32"),
41        ("bigints", "int64"),
42    ]
43)
44
45with conn.cursor() as cur:
46    assert cur.adbc_execute_schema("SELECT * FROM example") == expected

PostgreSQL doesn’t know the type here, so it just returns a guess.

49    assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
50        [
51            ("res", "string"),
52        ]
53    )
54
55conn.close()

List catalogs, schemas, and tables

Recipe source: postgresql_list_catalogs.py

ADBC allows listing tables, catalogs, and schemas in the database.

22import os
23
24import adbc_driver_postgresql.dbapi
25
26uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
27conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll create an example table to look for.

30with conn.cursor() as cur:
31    cur.execute("DROP TABLE IF EXISTS example")
32    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
33
34conn.commit()

The data is given as a PyArrow RecordBatchReader.

37objects = conn.adbc_get_objects(depth="all").read_all()

We’ll convert it to plain Python data for convenience.

40objects = objects.to_pylist()
41catalog = objects[0]
42assert catalog["catalog_name"] == "postgres"
43
44db_schema = catalog["catalog_db_schemas"][0]
45assert db_schema["db_schema_name"] == "public"
46
47tables = db_schema["db_schema_tables"]
48example = [table for table in tables if table["table_name"] == "example"]
49assert len(example) == 1
50example = example[0]
51
52assert example["table_columns"][0]["column_name"] == "ints"
53assert example["table_columns"][1]["column_name"] == "bigints"
54
55conn.close()

Connection pooling with SQLAlchemy

Recipe source: postgresql_pool.py

ADBC does not implement connection pooling, as this is not generally a feature of DBAPI drivers. Instead, use a third party connection pool like the one built into SQLAlchemy.

26import os
27
28import sqlalchemy.pool
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33
34source = adbc_driver_postgresql.dbapi.connect(uri)

adbc_driver_manager.dbapi.Connection.adbc_clone() opens a new connection from an existing connection, sharing internal resources where possible. For example, the PostgreSQL driver will share the internal OID cache, saving some overhead on connection.

39pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)

We can now get connections out of the pool; SQLAlchemy overrides close() to return the connection to the pool.

Note

SQLAlchemy’s wrapper does not support the context manager protocol, unlike the underlying ADBC connection.

47conn = pool.connect()
48
49assert pool.checkedin() == 0
50assert pool.checkedout() == 1
51
52with conn.cursor() as cur:
53    cur.execute("SELECT 1")
54    assert cur.fetchone() == (1,)
55
56conn.close()
57
58assert pool.checkedin() == 1
59assert pool.checkedout() == 0
60
61source.close()

Using Pandas and ADBC

Recipe source: postgresql_pandas.py

ADBC is integrated into pandas, a popular dataframe library. Pandas can use ADBC to exchange data with PostgreSQL and other databases. Compared to using SQLAlchemy or other options, using ADBC with pandas can have better performance, such as by avoiding excess conversions to and from Python objects.

28import os
29
30import pandas as pd
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

We’ll use pd.DataFrame.to_sql to create a sample table.

40data = pd.DataFrame(
41    {
42        "ints": [1, 2, None, 4],
43        "strs": ["a", "b", "c", "d"],
44    }
45)
46data.to_sql("example", conn, if_exists="replace")
47conn.commit()

After creating the table, we can pass an ADBC connection and a SQL query to pd.read_sql to get the result set as a pandas DataFrame.

53df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
54
55assert len(df) == 2
56
57conn.close()

Compared to the ADBC interface, pandas offers a more convenient and higher level API, especially for those already using pandas.

Using Polars and ADBC

Recipe source: postgresql_polars.py

ADBC can be used with Polars, a dataframe library written in Rust. As per its documentation:

If the backend supports returning Arrow data directly then this facility will be used to efficiently instantiate the DataFrame; otherwise, the DataFrame is initialised from row-wise data.

Obviously, ADBC returns Arrow data directly, making ADBC and Polars a natural fit for each other.

32import os
33
34import polars as pl
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]

We’ll use Polars to create a sample table with polars.DataFrame.write_database(). We don’t need to open an ADBC connection ourselves with Polars.

42data = pl.DataFrame(
43    {
44        "ints": [1, 2, None, 4],
45        "strs": ["a", "b", "c", "d"],
46    }
47)
48data.write_database("example", uri, engine="adbc", if_table_exists="replace")

After creating the table, we can use polars.read_database_uri() to fetch the result. Again, we can just pass the URI and tell Polars to manage ADBC for us.

54df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
55
56assert len(df) == 2