PostgreSQL Recipes¶
Authenticate with a username and password¶
Recipe source: postgresql_authenticate.py
To connect to a PostgreSQL database, the username and password must be provided in the URI. For example,
postgresql://username:password@hostname:port/dbname
See the PostgreSQL documentation for full details.
32import os
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
38
39with conn.cursor() as cur:
40 cur.execute("SELECT 1")
41 print(cur.fetchone())
42 # Output: (1,)
43
44conn.close()
(1,)
Create/append to a table from an Arrow dataset¶
Recipe source: postgresql_create_dataset_table.py
ADBC makes it easy to load PyArrow datasets into your datastore.
24import os
25import tempfile
26from pathlib import Path
27
28import pyarrow
29import pyarrow.csv
30import pyarrow.dataset
31import pyarrow.feather
32import pyarrow.parquet
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
41with conn.cursor() as cur:
42 cur.execute("DROP TABLE IF EXISTS csvtable")
43 cur.execute("DROP TABLE IF EXISTS ipctable")
44 cur.execute("DROP TABLE IF EXISTS pqtable")
45 cur.execute("DROP TABLE IF EXISTS csvdataset")
46 cur.execute("DROP TABLE IF EXISTS ipcdataset")
47 cur.execute("DROP TABLE IF EXISTS pqdataset")
48
49conn.commit()
Generating sample data¶
54tempdir = tempfile.TemporaryDirectory(
55 prefix="adbc-docs-",
56 ignore_cleanup_errors=True,
57)
58root = Path(tempdir.name)
59table = pyarrow.table(
60 [
61 [1, 1, 2],
62 ["foo", "bar", "baz"],
63 ],
64 names=["ints", "strs"],
65)
First we’ll write single files.
69csv_file = root / "example.csv"
70pyarrow.csv.write_csv(table, csv_file)
71
72ipc_file = root / "example.arrow"
73pyarrow.feather.write_feather(table, ipc_file)
74
75parquet_file = root / "example.parquet"
76pyarrow.parquet.write_table(table, parquet_file)
We’ll also generate some partitioned datasets.
80csv_dataset = root / "csv_dataset"
81pyarrow.dataset.write_dataset(
82 table,
83 csv_dataset,
84 format="csv",
85 partitioning=["ints"],
86)
87
88ipc_dataset = root / "ipc_dataset"
89pyarrow.dataset.write_dataset(
90 table,
91 ipc_dataset,
92 format="feather",
93 partitioning=["ints"],
94)
95
96parquet_dataset = root / "parquet_dataset"
97pyarrow.dataset.write_dataset(
98 table,
99 parquet_dataset,
100 format="parquet",
101 partitioning=["ints"],
102)
Loading CSV Files into PostgreSQL¶
We can directly pass a pyarrow.RecordBatchReader
(from
open_csv
) to adbc_ingest
. We can also pass a
pyarrow.dataset.Dataset
, or a
pyarrow.dataset.Scanner
.
112with conn.cursor() as cur:
113 reader = pyarrow.csv.open_csv(csv_file)
114 cur.adbc_ingest("csvtable", reader, mode="create")
115
116 reader = pyarrow.dataset.dataset(
117 csv_dataset,
118 format="csv",
119 partitioning=["ints"],
120 )
121 cur.adbc_ingest("csvdataset", reader, mode="create")
122
123conn.commit()
124
125with conn.cursor() as cur:
126 cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
127 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
128
129 cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
130 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Loading Arrow IPC (Feather) Files into PostgreSQL¶
135with conn.cursor() as cur:
136 reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
Because of quirks in the PyArrow API, we have to read the file into memory.
139 cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
The Dataset API will stream the data into memory and then into PostgreSQL, though.
143 reader = pyarrow.dataset.dataset(
144 ipc_dataset,
145 format="feather",
146 partitioning=["ints"],
147 )
148 cur.adbc_ingest("ipcdataset", reader, mode="create")
149
150conn.commit()
151
152with conn.cursor() as cur:
153 cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
154 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
155
156 cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
157 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Loading Parquet Files into PostgreSQL¶
162with conn.cursor() as cur:
163 reader = pyarrow.parquet.ParquetFile(parquet_file)
164 cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
165
166 reader = pyarrow.dataset.dataset(
167 parquet_dataset,
168 format="parquet",
169 partitioning=["ints"],
170 )
171 cur.adbc_ingest("pqdataset", reader, mode="create")
172
173conn.commit()
174
175with conn.cursor() as cur:
176 cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
177 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
178
179 cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
180 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
Cleanup¶
185conn.close()
186tempdir.cleanup()
Create/append to a table from an Arrow table¶
Recipe source: postgresql_create_append_table.py
ADBC allows creating and appending to database tables using Arrow tables.
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
35with conn.cursor() as cur:
36 cur.execute("DROP TABLE IF EXISTS example")
37 cur.execute("DROP TABLE IF EXISTS example2")
Now we can create the table.
40with conn.cursor() as cur:
41 data = pyarrow.table(
42 [
43 [1, 2, None, 4],
44 ],
45 schema=pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ]
49 ),
50 )
51 cur.adbc_ingest("example", data, mode="create")
52
53conn.commit()
After ingestion, we can fetch the result.
56with conn.cursor() as cur:
57 cur.execute("SELECT * FROM example")
58 assert cur.fetchone() == (1,)
59 assert cur.fetchone() == (2,)
60
61 cur.execute("SELECT COUNT(*) FROM example")
62 assert cur.fetchone() == (4,)
If we try to ingest again, it’ll fail, because the table already exists.
66with conn.cursor() as cur:
67 try:
68 cur.adbc_ingest("example", data, mode="create")
69 except conn.ProgrammingError:
70 pass
71 else:
72 raise RuntimeError("Should have failed!")
73
74conn.rollback()
Instead, we can append to the table.
77with conn.cursor() as cur:
78 cur.adbc_ingest("example", data, mode="append")
79
80 cur.execute("SELECT COUNT(*) FROM example")
81 assert cur.fetchone() == (8,)
We can also choose to create the table if it doesn’t exist, and otherwise append.
86with conn.cursor() as cur:
87 cur.adbc_ingest("example2", data, mode="create_append")
88
89 cur.execute("SELECT COUNT(*) FROM example2")
90 assert cur.fetchone() == (4,)
91
92 cur.adbc_ingest("example2", data, mode="create_append")
93
94 cur.execute("SELECT COUNT(*) FROM example2")
95 assert cur.fetchone() == (8,)
Finally, we can replace the table.
99with conn.cursor() as cur:
100 cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
101
102 cur.execute("SELECT COUNT(*) FROM example")
103 assert cur.fetchone() == (2,)
104
105conn.close()
Create/append to a temporary table¶
Recipe source: postgresql_create_temp_table.py
ADBC allows creating and appending to temporary tables as well.
23import os
24
25import pyarrow
26
27import adbc_driver_postgresql.dbapi
28
29uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
30conn = adbc_driver_postgresql.dbapi.connect(uri)
For the purposes of testing, we’ll first make sure the tables we’re about to use don’t exist.
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
To create a temporary table, just specify the option “temporary”.
38data = pyarrow.table(
39 [
40 [1, 2, None, 4],
41 ],
42 schema=pyarrow.schema(
43 [
44 ("ints", "int32"),
45 ]
46 ),
47)
48
49with conn.cursor() as cur:
50 cur.adbc_ingest("example", data, mode="create", temporary=True)
51
52conn.commit()
After ingestion, we can fetch the result.
55with conn.cursor() as cur:
56 cur.execute("SELECT * FROM example")
57 assert cur.fetchone() == (1,)
58 assert cur.fetchone() == (2,)
59
60 cur.execute("SELECT COUNT(*) FROM example")
61 assert cur.fetchone() == (4,)
Temporary tables are separate from regular tables, even if they have the same name.
66with conn.cursor() as cur:
67 cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
68
69conn.commit()
70
71with conn.cursor() as cur:
Because we have two tables with the same name, we have to explicitly reference the normal temporary table here.
74 cur.execute("SELECT COUNT(*) FROM public.example")
75 assert cur.fetchone() == (2,)
76
77 cur.execute("SELECT COUNT(*) FROM example")
78 assert cur.fetchone() == (4,)
79
80conn.close()
After closing the connection, the temporary table is implicitly dropped. If we reconnect, the table won’t exist; we’ll see only the ‘normal’ table.
85with adbc_driver_postgresql.dbapi.connect(uri) as conn:
86 with conn.cursor() as cur:
87 cur.execute("SELECT COUNT(*) FROM example")
88 assert cur.fetchone() == (2,)
All the regular ingestion options apply to temporary tables, too. See Create/append to a table from an Arrow dataset for more examples.
Execute a statement with bind parameters¶
Recipe source: postgresql_execute_bind.py
ADBC allows using Python and Arrow values as bind parameters. Right now, the PostgreSQL driver only supports bind parameters for queries that don’t generate result sets.
26import os
27
28import pyarrow
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to test.
36with conn.cursor() as cur:
37 cur.execute("DROP TABLE IF EXISTS example")
38 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
39
40conn.commit()
We can bind Python values:
43with conn.cursor() as cur:
44 cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
45
46 cur.execute("SELECT SUM(ints) FROM example")
47 assert cur.fetchone() == (4,)
Note
If you’re used to the format-string style %s
syntax that
libraries like psycopg use for bind parameters, note that this
is not supported—only the PostgreSQL-native $1
syntax.
We can also bind Arrow values:
54with conn.cursor() as cur:
55 data = pyarrow.record_batch(
56 [
57 [5, 6],
58 [7, 8],
59 ],
60 names=["$1", "$2"],
61 )
62 cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
63
64 cur.execute("SELECT SUM(ints) FROM example")
65 assert cur.fetchone() == (15,)
66
67conn.close()
Get the Arrow schema of a table¶
Recipe source: postgresql_get_table_schema.py
ADBC lets you get the schema of a table as an Arrow schema.
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create some example tables to test.
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38 cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
39 cur.execute("DROP TABLE IF EXISTS other_schema.example")
40 cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
41
42conn.commit()
By default the “active” catalog/schema are assumed.
45assert conn.adbc_get_table_schema("example") == pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ("bigints", "int64"),
49 ]
50)
We can explicitly specify the PostgreSQL schema to get the Arrow schema of a table in a different namespace.
Note
In PostgreSQL, you can only query the database (catalog) that you are connected to. So we cannot specify the catalog here (or rather, there is no point in doing so).
Note that the NUMERIC column is read as a string, because PostgreSQL decimals do not map onto Arrow decimals.
61assert conn.adbc_get_table_schema(
62 "example",
63 db_schema_filter="other_schema",
64) == pyarrow.schema(
65 [
66 ("strings", "string"),
67 ("values", "int32"),
68 ]
69)
70
71conn.close()
Get the Arrow schema of a query¶
Recipe source: postgresql_get_query_schema.py
ADBC lets you get the schema of a result set, without executing the query.
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to test.
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
39
40expected = pyarrow.schema(
41 [
42 ("ints", "int32"),
43 ("bigints", "int64"),
44 ]
45)
46
47with conn.cursor() as cur:
48 assert cur.adbc_execute_schema("SELECT * FROM example") == expected
PostgreSQL doesn’t know the type here, so it just returns a guess.
51 assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
52 [
53 ("res", "string"),
54 ]
55 )
56
57conn.close()
List catalogs, schemas, and tables¶
Recipe source: postgresql_list_catalogs.py
ADBC allows listing tables, catalogs, and schemas in the database.
24import os
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll create an example table to look for.
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
The data is given as a PyArrow RecordBatchReader.
39objects = conn.adbc_get_objects(depth="all").read_all()
We’ll convert it to plain Python data for convenience.
42objects = objects.to_pylist()
43catalog = objects[0]
44assert catalog["catalog_name"] == "postgres"
45
46db_schema = catalog["catalog_db_schemas"][0]
47assert db_schema["db_schema_name"] == "public"
48
49tables = db_schema["db_schema_tables"]
50example = [table for table in tables if table["table_name"] == "example"]
51assert len(example) == 1
52example = example[0]
53
54assert example["table_columns"][0]["column_name"] == "ints"
55assert example["table_columns"][1]["column_name"] == "bigints"
56
57conn.close()
Connection pooling with SQLAlchemy¶
Recipe source: postgresql_pool.py
ADBC does not implement connection pooling, as this is not generally a feature of DBAPI drivers. Instead, use a third party connection pool like the one built into SQLAlchemy.
28import os
29
30import sqlalchemy.pool
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35
36source = adbc_driver_postgresql.dbapi.connect(uri)
adbc_driver_manager.dbapi.Connection.adbc_clone()
opens a new
connection from an existing connection, sharing internal resources where
possible. For example, the PostgreSQL driver will share the internal OID
cache, saving some overhead on connection.
41pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)
We can now get connections out of the pool; SQLAlchemy overrides
close()
to return the connection to the pool.
Note
SQLAlchemy’s wrapper does not support the context manager protocol, unlike the underlying ADBC connection.
49conn = pool.connect()
50
51assert pool.checkedin() == 0
52assert pool.checkedout() == 1
53
54with conn.cursor() as cur:
55 cur.execute("SELECT 1")
56 assert cur.fetchone() == (1,)
57
58conn.close()
59
60assert pool.checkedin() == 1
61assert pool.checkedout() == 0
62
63source.close()
Using Pandas and ADBC¶
Recipe source: postgresql_pandas.py
ADBC is integrated into pandas, a popular dataframe library. Pandas can use ADBC to exchange data with PostgreSQL and other databases. Compared to using SQLAlchemy or other options, using ADBC with pandas can have better performance, such as by avoiding excess conversions to and from Python objects.
30import os
31
32import pandas as pd
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
We’ll use pd.DataFrame.to_sql
to create a sample table.
42data = pd.DataFrame(
43 {
44 "ints": [1, 2, None, 4],
45 "strs": ["a", "b", "c", "d"],
46 }
47)
48data.to_sql("example", conn, if_exists="replace")
49conn.commit()
After creating the table, we can pass an ADBC connection and a SQL query to
pd.read_sql
to get the result set as a
pandas DataFrame.
55df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
56
57assert len(df) == 2
58
59conn.close()
Compared to the ADBC interface, pandas offers a more convenient and higher level API, especially for those already using pandas.
Using Polars and ADBC¶
Recipe source: postgresql_polars.py
ADBC can be used with Polars, a dataframe library written in Rust. As per its documentation:
If the backend supports returning Arrow data directly then this facility will be used to efficiently instantiate the DataFrame; otherwise, the DataFrame is initialised from row-wise data.
Obviously, ADBC returns Arrow data directly, making ADBC and Polars a natural fit for each other.
34import os
35
36import polars as pl
37
38uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
We’ll use Polars to create a sample table with
polars.DataFrame.write_database()
. We don’t need
to open an ADBC connection ourselves with Polars.
44data = pl.DataFrame(
45 {
46 "ints": [1, 2, None, 4],
47 "strs": ["a", "b", "c", "d"],
48 }
49)
50data.write_database("example", uri, engine="adbc", if_table_exists="replace")
After creating the table, we can use
polars.read_database_uri()
to fetch the result. Again,
we can just pass the URI and tell Polars to manage ADBC for us.
56df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
57
58assert len(df) == 2