PostgreSQL Recipes¶
Authenticate with a username and password¶
Recipe source: postgresql_authenticate.py
To connect to a PostgreSQL database, the username and password must be provided in the URI. For example,
postgresql://username:password@hostname:port/dbname
See the PostgreSQL documentation for full details.
30import os
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
36
37with conn.cursor() as cur:
38 cur.execute("SELECT 1")
39 assert cur.fetchone() == (1,)
40
41conn.close()
Create/append to a table from an Arrow dataset¶
Recipe source: postgresql_create_dataset_table.py
ADBC makes it easy to load PyArrow datasets into your datastore.
22import os
23import tempfile
24from pathlib import Path
25
26import pyarrow
27import pyarrow.csv
28import pyarrow.dataset
29import pyarrow.feather
30import pyarrow.parquet
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
39with conn.cursor() as cur:
40 cur.execute("DROP TABLE IF EXISTS csvtable")
41 cur.execute("DROP TABLE IF EXISTS ipctable")
42 cur.execute("DROP TABLE IF EXISTS pqtable")
43 cur.execute("DROP TABLE IF EXISTS csvdataset")
44 cur.execute("DROP TABLE IF EXISTS ipcdataset")
45 cur.execute("DROP TABLE IF EXISTS pqdataset")
46
47conn.commit()
Generating sample data¶
52tempdir = tempfile.TemporaryDirectory(
53 prefix="adbc-docs-",
54 ignore_cleanup_errors=True,
55)
56root = Path(tempdir.name)
57table = pyarrow.table(
58 [
59 [1, 1, 2],
60 ["foo", "bar", "baz"],
61 ],
62 names=["ints", "strs"],
63)
First we’ll write single files.
67csv_file = root / "example.csv"
68pyarrow.csv.write_csv(table, csv_file)
69
70ipc_file = root / "example.arrow"
71pyarrow.feather.write_feather(table, ipc_file)
72
73parquet_file = root / "example.parquet"
74pyarrow.parquet.write_table(table, parquet_file)
We’ll also generate some partitioned datasets.
78csv_dataset = root / "csv_dataset"
79pyarrow.dataset.write_dataset(
80 table,
81 csv_dataset,
82 format="csv",
83 partitioning=["ints"],
84)
85
86ipc_dataset = root / "ipc_dataset"
87pyarrow.dataset.write_dataset(
88 table,
89 ipc_dataset,
90 format="feather",
91 partitioning=["ints"],
92)
93
94parquet_dataset = root / "parquet_dataset"
95pyarrow.dataset.write_dataset(
96 table,
97 parquet_dataset,
98 format="parquet",
99 partitioning=["ints"],
100)
Loading CSV Files into PostgreSQL¶
We can directly pass a pyarrow.RecordBatchReader
(from
open_csv
) to adbc_ingest
. We can also pass a
pyarrow.dataset.Dataset
, or a
pyarrow.dataset.Scanner
.
110with conn.cursor() as cur:
111 reader = pyarrow.csv.open_csv(csv_file)
112 cur.adbc_ingest("csvtable", reader, mode="create")
113
114 reader = pyarrow.dataset.dataset(
115 csv_dataset,
116 format="csv",
117 partitioning=["ints"],
118 )
119 cur.adbc_ingest("csvdataset", reader, mode="create")
120
121conn.commit()
122
123with conn.cursor() as cur:
124 cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
125 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
126
127 cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
128 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Loading Arrow IPC (Feather) Files into PostgreSQL¶
133with conn.cursor() as cur:
134 reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
Because of quirks in the PyArrow API, we have to read the file into memory.
137 cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
The Dataset API will stream the data into memory and then into PostgreSQL, though.
141 reader = pyarrow.dataset.dataset(
142 ipc_dataset,
143 format="feather",
144 partitioning=["ints"],
145 )
146 cur.adbc_ingest("ipcdataset", reader, mode="create")
147
148conn.commit()
149
150with conn.cursor() as cur:
151 cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
152 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
153
154 cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
155 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Loading Parquet Files into PostgreSQL¶
160with conn.cursor() as cur:
161 reader = pyarrow.parquet.ParquetFile(parquet_file)
162 cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
163
164 reader = pyarrow.dataset.dataset(
165 parquet_dataset,
166 format="parquet",
167 partitioning=["ints"],
168 )
169 cur.adbc_ingest("pqdataset", reader, mode="create")
170
171conn.commit()
172
173with conn.cursor() as cur:
174 cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
175 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
176
177 cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
178 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Cleanup¶
183conn.close()
184tempdir.cleanup()
Create/append to a table from an Arrow table¶
Recipe source: postgresql_create_append_table.py
ADBC allows creating and appending to database tables using Arrow tables.
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
33with conn.cursor() as cur:
34 cur.execute("DROP TABLE IF EXISTS example")
35 cur.execute("DROP TABLE IF EXISTS example2")
Now we can create the table.
38with conn.cursor() as cur:
39 data = pyarrow.table(
40 [
41 [1, 2, None, 4],
42 ],
43 schema=pyarrow.schema(
44 [
45 ("ints", "int32"),
46 ]
47 ),
48 )
49 cur.adbc_ingest("example", data, mode="create")
50
51conn.commit()
After ingestion, we can fetch the result.
54with conn.cursor() as cur:
55 cur.execute("SELECT * FROM example")
56 assert cur.fetchone() == (1,)
57 assert cur.fetchone() == (2,)
58
59 cur.execute("SELECT COUNT(*) FROM example")
60 assert cur.fetchone() == (4,)
If we try to ingest again, it’ll fail, because the table already exists.
64with conn.cursor() as cur:
65 try:
66 cur.adbc_ingest("example", data, mode="create")
67 except conn.ProgrammingError:
68 pass
69 else:
70 raise RuntimeError("Should have failed!")
71
72conn.rollback()
Instead, we can append to the table.
75with conn.cursor() as cur:
76 cur.adbc_ingest("example", data, mode="append")
77
78 cur.execute("SELECT COUNT(*) FROM example")
79 assert cur.fetchone() == (8,)
We can also choose to create the table if it doesn’t exist, and otherwise append.
84with conn.cursor() as cur:
85 cur.adbc_ingest("example2", data, mode="create_append")
86
87 cur.execute("SELECT COUNT(*) FROM example2")
88 assert cur.fetchone() == (4,)
89
90 cur.adbc_ingest("example2", data, mode="create_append")
91
92 cur.execute("SELECT COUNT(*) FROM example2")
93 assert cur.fetchone() == (8,)
Finally, we can replace the table.
97with conn.cursor() as cur:
98 cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
99
100 cur.execute("SELECT COUNT(*) FROM example")
101 assert cur.fetchone() == (2,)
102
103conn.close()
Create/append to a temporary table¶
Recipe source: postgresql_create_temp_table.py
ADBC allows creating and appending to temporary tables as well.
21import os
22
23import pyarrow
24
25import adbc_driver_postgresql.dbapi
26
27uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
28conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
To create a temporary table, just specify the option “temporary”.
36data = pyarrow.table(
37 [
38 [1, 2, None, 4],
39 ],
40 schema=pyarrow.schema(
41 [
42 ("ints", "int32"),
43 ]
44 ),
45)
46
47with conn.cursor() as cur:
48 cur.adbc_ingest("example", data, mode="create", temporary=True)
49
50conn.commit()
After ingestion, we can fetch the result.
53with conn.cursor() as cur:
54 cur.execute("SELECT * FROM example")
55 assert cur.fetchone() == (1,)
56 assert cur.fetchone() == (2,)
57
58 cur.execute("SELECT COUNT(*) FROM example")
59 assert cur.fetchone() == (4,)
Temporary tables are separate from regular tables, even if they have the same name.
64with conn.cursor() as cur:
65 cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
66
67conn.commit()
68
69with conn.cursor() as cur:
Because we have two tables with the same name, we have to explicitly reference the normal temporary table here.
72 cur.execute("SELECT COUNT(*) FROM public.example")
73 assert cur.fetchone() == (2,)
74
75 cur.execute("SELECT COUNT(*) FROM example")
76 assert cur.fetchone() == (4,)
77
78conn.close()
After closing the connection, the temporary table is implicitly dropped. If we reconnect, the table won’t exist; we’ll see only the ‘normal’ table.
83with adbc_driver_postgresql.dbapi.connect(uri) as conn:
84 with conn.cursor() as cur:
85 cur.execute("SELECT COUNT(*) FROM example")
86 assert cur.fetchone() == (2,)
All the regular ingestion options apply to temporary tables, too. See Create/append to a table from an Arrow dataset for more examples.
Execute a statement with bind parameters¶
Recipe source: postgresql_execute_bind.py
ADBC allows using Python and Arrow values as bind parameters. Right now, the PostgreSQL driver only supports bind parameters for queries that don’t generate result sets.
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to test.
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
We can bind Python values:
41with conn.cursor() as cur:
42 cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
43
44 cur.execute("SELECT SUM(ints) FROM example")
45 assert cur.fetchone() == (4,)
Note
If you’re used to the format-string style %s
syntax that
libraries like psycopg use for bind parameters, note that this
is not supported—only the PostgreSQL-native $1
syntax.
We can also bind Arrow values:
52with conn.cursor() as cur:
53 data = pyarrow.record_batch(
54 [
55 [5, 6],
56 [7, 8],
57 ],
58 names=["$1", "$2"],
59 )
60 cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
61
62 cur.execute("SELECT SUM(ints) FROM example")
63 assert cur.fetchone() == (15,)
64
65conn.close()
Get the Arrow schema of a table¶
Recipe source: postgresql_get_table_schema.py
ADBC lets you get the schema of a table as an Arrow schema.
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create some example tables to test.
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36 cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
37 cur.execute("DROP TABLE IF EXISTS other_schema.example")
38 cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
39
40conn.commit()
By default the “active” catalog/schema are assumed.
43assert conn.adbc_get_table_schema("example") == pyarrow.schema(
44 [
45 ("ints", "int32"),
46 ("bigints", "int64"),
47 ]
48)
We can explicitly specify the PostgreSQL schema to get the Arrow schema of a table in a different namespace.
Note
In PostgreSQL, you can only query the database (catalog) that you are connected to. So we cannot specify the catalog here (or rather, there is no point in doing so).
Note that the NUMERIC column is read as a string, because PostgreSQL decimals do not map onto Arrow decimals.
59assert conn.adbc_get_table_schema(
60 "example",
61 db_schema_filter="other_schema",
62) == pyarrow.schema(
63 [
64 ("strings", "string"),
65 ("values", "int32"),
66 ]
67)
68
69conn.close()
Get the Arrow schema of a query¶
Recipe source: postgresql_get_query_schema.py
ADBC lets you get the schema of a result set, without executing the query.
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to test.
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
37
38expected = pyarrow.schema(
39 [
40 ("ints", "int32"),
41 ("bigints", "int64"),
42 ]
43)
44
45with conn.cursor() as cur:
46 assert cur.adbc_execute_schema("SELECT * FROM example") == expected
PostgreSQL doesn’t know the type here, so it just returns a guess.
49 assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
50 [
51 ("res", "string"),
52 ]
53 )
54
55conn.close()
List catalogs, schemas, and tables¶
Recipe source: postgresql_list_catalogs.py
ADBC allows listing tables, catalogs, and schemas in the database.
22import os
23
24import adbc_driver_postgresql.dbapi
25
26uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
27conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to look for.
30with conn.cursor() as cur:
31 cur.execute("DROP TABLE IF EXISTS example")
32 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
33
34conn.commit()
The data is given as a PyArrow RecordBatchReader.
37objects = conn.adbc_get_objects(depth="all").read_all()
We’ll convert it to plain Python data for convenience.
40objects = objects.to_pylist()
41catalog = objects[0]
42assert catalog["catalog_name"] == "postgres"
43
44db_schema = catalog["catalog_db_schemas"][0]
45assert db_schema["db_schema_name"] == "public"
46
47tables = db_schema["db_schema_tables"]
48example = [table for table in tables if table["table_name"] == "example"]
49assert len(example) == 1
50example = example[0]
51
52assert example["table_columns"][0]["column_name"] == "ints"
53assert example["table_columns"][1]["column_name"] == "bigints"
54
55conn.close()
Connection pooling with SQLAlchemy¶
Recipe source: postgresql_pool.py
ADBC does not implement connection pooling, as this is not generally a feature of DBAPI drivers. Instead, use a third party connection pool like the one built into SQLAlchemy.
26import os
27
28import sqlalchemy.pool
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33
34source = adbc_driver_postgresql.dbapi.connect(uri)
adbc_driver_manager.dbapi.Connection.adbc_clone()
opens a new
connection from an existing connection, sharing internal resources where
possible. For example, the PostgreSQL driver will share the internal OID
cache, saving some overhead on connection.
39pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)
We can now get connections out of the pool; SQLAlchemy overrides
close()
to return the connection to the pool.
Note
SQLAlchemy’s wrapper does not support the context manager protocol, unlike the underlying ADBC connection.
47conn = pool.connect()
48
49assert pool.checkedin() == 0
50assert pool.checkedout() == 1
51
52with conn.cursor() as cur:
53 cur.execute("SELECT 1")
54 assert cur.fetchone() == (1,)
55
56conn.close()
57
58assert pool.checkedin() == 1
59assert pool.checkedout() == 0
60
61source.close()
Using Pandas and ADBC¶
Recipe source: postgresql_pandas.py
ADBC is integrated into pandas, a popular dataframe library. Pandas can use ADBC to exchange data with PostgreSQL and other databases. Compared to using SQLAlchemy or other options, using ADBC with pandas can have better performance, such as by avoiding excess conversions to and from Python objects.
28import os
29
30import pandas as pd
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll use pd.DataFrame.to_sql
to create a sample table.
40data = pd.DataFrame(
41 {
42 "ints": [1, 2, None, 4],
43 "strs": ["a", "b", "c", "d"],
44 }
45)
46data.to_sql("example", conn, if_exists="replace")
47conn.commit()
After creating the table, we can pass an ADBC connection and a SQL query to
pd.read_sql
to get the result set as a
pandas DataFrame.
53df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
54
55assert len(df) == 2
56
57conn.close()
Compared to the ADBC interface, pandas offers a more convenient and higher level API, especially for those already using pandas.
Using Polars and ADBC¶
Recipe source: postgresql_polars.py
ADBC can be used with Polars, a dataframe library written in Rust. As per its documentation:
If the backend supports returning Arrow data directly then this facility will be used to efficiently instantiate the DataFrame; otherwise, the DataFrame is initialised from row-wise data.
Obviously, ADBC returns Arrow data directly, making ADBC and Polars a natural fit for each other.
32import os
33
34import polars as pl
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
We’ll use Polars to create a sample table with
polars.DataFrame.write_database()
. We don’t need
to open an ADBC connection ourselves with Polars.
42data = pl.DataFrame(
43 {
44 "ints": [1, 2, None, 4],
45 "strs": ["a", "b", "c", "d"],
46 }
47)
48data.write_database("example", uri, engine="adbc", if_table_exists="replace")
After creating the table, we can use
polars.read_database_uri()
to fetch the result. Again,
we can just pass the URI and tell Polars to manage ADBC for us.
54df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
55
56assert len(df) == 2