Reading and Writing Data¶
Recipes related to reading and writing data from disk using Apache Arrow.
Write a Parquet file¶
Given an array with 100 numbers, from 0 to 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
To write it to a Parquet file,
as Parquet is a format that contains multiple named columns,
we must create a pyarrow.Table
out of it,
so that we get a table of a single column which can then be
written to a Parquet file.
table = pa.Table.from_arrays([arr], names=["col1"])
Once we have a table, it can be written to a Parquet File
using the functions provided by the pyarrow.parquet
module
import pyarrow.parquet as pq
pq.write_table(table, "example.parquet", compression=None)
Reading a Parquet file¶
Given a Parquet file, it can be read back to a pyarrow.Table
by using pyarrow.parquet.read_table()
function
import pyarrow.parquet as pq
table = pq.read_table("example.parquet")
The resulting table will contain the same columns that existed in
the parquet file as ChunkedArray
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Reading a subset of Parquet data¶
When reading a Parquet file with pyarrow.parquet.read_table()
it is possible to restrict which Columns and Rows will be read
into memory by using the filters
and columns
arguments
import pyarrow.parquet as pq
table = pq.read_table("example.parquet",
columns=["col1"],
filters=[
("col1", ">", 5),
("col1", "<", 10),
])
The resulting table will contain only the projected columns
and filtered rows. Refer to pyarrow.parquet.read_table()
documentation for details about the syntax for filters.
print(table)
pyarrow.Table
col1: int64
----
col1: [[6,7,8,9]]
Saving Arrow Arrays to disk¶
Apart from using arrow to read and save common file formats like Parquet, it is possible to dump data in the raw arrow format which allows direct memory mapping of data from disk. This format is called the Arrow IPC format.
Given an array with 100 numbers, from 0 to 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
We can save the array by making a pyarrow.RecordBatch
out
of it and writing the record batch to disk.
schema = pa.schema([
pa.field('nums', arr.type)
])
with pa.OSFile('arraydata.arrow', 'wb') as sink:
with pa.ipc.new_file(sink, schema=schema) as writer:
batch = pa.record_batch([arr], schema=schema)
writer.write(batch)
If we were to save multiple arrays into the same file,
we would just have to adapt the schema
accordingly and add
them all to the record_batch
call.
Memory Mapping Arrow Arrays from disk¶
Arrow arrays that have been written to disk in the Arrow IPC format can be memory mapped back directly from the disk.
with pa.memory_map('arraydata.arrow', 'r') as source:
loaded_arrays = pa.ipc.open_file(source).read_all()
arr = loaded_arrays[0]
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
Writing CSV files¶
It is possible to write an Arrow pyarrow.Table
to
a CSV file using the pyarrow.csv.write_csv()
function
arr = pa.array(range(100))
table = pa.Table.from_arrays([arr], names=["col1"])
import pyarrow.csv
pa.csv.write_csv(table, "table.csv",
write_options=pa.csv.WriteOptions(include_header=True))
Writing CSV files incrementally¶
If you need to write data to a CSV file incrementally
as you generate or retrieve the data and you don’t want to keep
in memory the whole table to write it at once, it’s possible to use
pyarrow.csv.CSVWriter
to write data incrementally
schema = pa.schema([("col1", pa.int32())])
with pa.csv.CSVWriter("table.csv", schema=schema) as writer:
for chunk in range(10):
datachunk = range(chunk*10, (chunk+1)*10)
table = pa.Table.from_arrays([pa.array(datachunk)], schema=schema)
writer.write(table)
It’s equally possible to write pyarrow.RecordBatch
by passing them as you would for tables.
Reading CSV files¶
Arrow can read pyarrow.Table
entities from CSV using an
optimized codepath that can leverage multiple threads.
import pyarrow.csv
table = pa.csv.read_csv("table.csv")
Arrow will do its best to infer data types. Further options can be
provided to pyarrow.csv.read_csv()
to drive
pyarrow.csv.ConvertOptions
.
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Writing Partitioned Datasets¶
When your dataset is big it usually makes sense to split it into
multiple separate files. You can do this manually or use
pyarrow.dataset.write_dataset()
to let Arrow do the effort
of splitting the data in chunks for you.
The partitioning
argument allows to tell pyarrow.dataset.write_dataset()
for which columns the data should be split.
For example given 100 birthdays, within 2000 and 2009
import numpy.random
data = pa.table({"day": numpy.random.randint(1, 31, size=100),
"month": numpy.random.randint(1, 12, size=100),
"year": [2000 + x // 10 for x in range(100)]})
Then we could partition the data by the year column so that it gets saved in 10 different files:
import pyarrow as pa
import pyarrow.dataset as ds
ds.write_dataset(data, "./partitioned", format="parquet",
partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))
Arrow will partition datasets in subdirectories by default, which will result in 10 different directories named with the value of the partitioning column each with a file containing the subset of the data for that partition:
from pyarrow import fs
localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))
for file in files:
print(file)
./partitioned/2000/part-0.parquet
./partitioned/2001/part-0.parquet
./partitioned/2002/part-0.parquet
./partitioned/2003/part-0.parquet
./partitioned/2004/part-0.parquet
./partitioned/2005/part-0.parquet
./partitioned/2006/part-0.parquet
./partitioned/2007/part-0.parquet
./partitioned/2008/part-0.parquet
./partitioned/2009/part-0.parquet
Reading Partitioned data¶
In some cases, your dataset might be composed by multiple separate files each containing a piece of the data.
In this case the pyarrow.dataset.dataset()
function provides
an interface to discover and read all those files as a single big dataset.
For example if we have a structure like:
examples/
├── dataset1.parquet
├── dataset2.parquet
└── dataset3.parquet
Then, pointing the pyarrow.dataset.dataset()
function to the examples
directory
will discover those parquet files and will expose them all as a single
pyarrow.dataset.Dataset
:
import pyarrow.dataset as ds
dataset = ds.dataset("./examples", format="parquet")
print(dataset.files)
['./examples/dataset1.parquet', './examples/dataset2.parquet', './examples/dataset3.parquet']
The whole dataset can be viewed as a single big table using
pyarrow.dataset.Dataset.to_table()
. While each parquet file
contains only 10 rows, converting the dataset to a table will
expose them as a single Table.
table = dataset.to_table()
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,5,6,7,8,9],[10,11,12,13,14,15,16,17,18,19],[20,21,22,23,24,25,26,27,28,29]]
Notice that converting to a table will force all data to be loaded in memory. For big datasets is usually not what you want.
For this reason, it might be better to rely on the
pyarrow.dataset.Dataset.to_batches()
method, which will
iteratively load the dataset one chunk of data at the time returning a
pyarrow.RecordBatch
for each one of them.
for record_batch in dataset.to_batches():
col1 = record_batch.column("col1")
print(f"{col1._name} = {col1[0]} .. {col1[-1]}")
col1 = 0 .. 9
col1 = 10 .. 19
col1 = 20 .. 29
Reading Partitioned Data from S3¶
The pyarrow.dataset.Dataset
is also able to abstract
partitioned data coming from remote sources like S3 or HDFS.
from pyarrow import fs
# List content of s3://ursa-labs-taxi-data/2011
s3 = fs.SubTreeFileSystem(
"ursa-labs-taxi-data",
fs.S3FileSystem(region="us-east-2", anonymous=True)
)
for entry in s3.get_file_info(fs.FileSelector("2011", recursive=True)):
if entry.type == fs.FileType.File:
print(entry.path)
2011/01/data.parquet
2011/02/data.parquet
2011/03/data.parquet
2011/04/data.parquet
2011/05/data.parquet
2011/06/data.parquet
2011/07/data.parquet
2011/08/data.parquet
2011/09/data.parquet
2011/10/data.parquet
2011/11/data.parquet
2011/12/data.parquet
The data in the bucket can be loaded as a single big dataset partitioned
by month
using
dataset = ds.dataset("s3://ursa-labs-taxi-data/2011",
partitioning=["month"])
for f in dataset.files[:10]:
print(f)
print("...")
ursa-labs-taxi-data/2011/01/data.parquet
ursa-labs-taxi-data/2011/02/data.parquet
ursa-labs-taxi-data/2011/03/data.parquet
ursa-labs-taxi-data/2011/04/data.parquet
ursa-labs-taxi-data/2011/05/data.parquet
ursa-labs-taxi-data/2011/06/data.parquet
ursa-labs-taxi-data/2011/07/data.parquet
ursa-labs-taxi-data/2011/08/data.parquet
ursa-labs-taxi-data/2011/09/data.parquet
ursa-labs-taxi-data/2011/10/data.parquet
...
The dataset can then be used with pyarrow.dataset.Dataset.to_table()
or pyarrow.dataset.Dataset.to_batches()
like you would for a local one.
Note
It is possible to load partitioned data also in the ipc arrow format or in feather format.
Warning
If the above code throws an error most likely the reason is your
AWS credentials are not set. Follow these instructions to get
AWS Access Key Id
and AWS Secret Access Key
:
AWS Credentials.
The credentials are normally stored in ~/.aws/credentials
(on Mac or Linux)
or in C:\Users\<USERNAME>\.aws\credentials
(on Windows) file.
You will need to either create or update this file in the appropriate location.
The contents of the file should look like this:
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
Write a Feather file¶
Given an array with 100 numbers, from 0 to 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
To write it to a Feather file, as Feather stores multiple columns,
we must create a pyarrow.Table
out of it,
so that we get a table of a single column which can then be
written to a Feather file.
table = pa.Table.from_arrays([arr], names=["col1"])
Once we have a table, it can be written to a Feather File
using the functions provided by the pyarrow.feather
module
import pyarrow.feather as ft
ft.write_feather(table, 'example.feather')
Reading a Feather file¶
Given a Feather file, it can be read back to a pyarrow.Table
by using pyarrow.feather.read_table()
function
import pyarrow.feather as ft
table = ft.read_table("example.feather")
The resulting table will contain the same columns that existed in
the parquet file as ChunkedArray
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Reading Line Delimited JSON¶
Arrow has builtin support for line-delimited JSON. Each line represents a row of data as a JSON object.
Given some data in a file where each line is a JSON object containing a row of data:
import tempfile
with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f:
f.write('{"a": 1, "b": 2.0, "c": 1}\n')
f.write('{"a": 3, "b": 3.0, "c": 2}\n')
f.write('{"a": 5, "b": 4.0, "c": 3}\n')
f.write('{"a": 7, "b": 5.0, "c": 4}\n')
The content of the file can be read back to a pyarrow.Table
using
pyarrow.json.read_json()
:
import pyarrow as pa
import pyarrow.json
table = pa.json.read_json(f.name)
print(table.to_pydict())
{'a': [1, 3, 5, 7], 'b': [2.0, 3.0, 4.0, 5.0], 'c': [1, 2, 3, 4]}
Writing Compressed Data¶
Arrow provides support for writing files in compressed formats, both for formats that provide compression natively like Parquet or Feather, and for formats that don’t support compression out of the box like CSV.
Given a table:
table = pa.table([
pa.array([1, 2, 3, 4, 5])
], names=["numbers"])
Writing compressed Parquet or Feather data is driven by the
compression
argument to the pyarrow.feather.write_feather()
and
pyarrow.parquet.write_table()
functions:
pa.feather.write_feather(table, "compressed.feather",
compression="lz4")
pa.parquet.write_table(table, "compressed.parquet",
compression="lz4")
You can refer to each of those functions’ documentation for a complete list of supported compression formats.
Note
Arrow actually uses compression by default when writing
Parquet or Feather files. Feather is compressed using lz4
by default and Parquet uses snappy
by default.
For formats that don’t support compression natively, like CSV,
it’s possible to save compressed data using
pyarrow.CompressedOutputStream
:
with pa.CompressedOutputStream("compressed.csv.gz", "gzip") as out:
pa.csv.write_csv(table, out)
This requires decompressing the file when reading it back,
which can be done using pyarrow.CompressedInputStream
as explained in the next recipe.
Reading Compressed Data¶
Arrow provides support for reading compressed files, both for formats that provide it natively like Parquet or Feather, and for files in formats that don’t support compression natively, like CSV, but have been compressed by an application.
Reading compressed formats that have native support for compression
doesn’t require any special handling. We can for example read back
the Parquet and Feather files we wrote in the previous recipe
by simply invoking pyarrow.feather.read_table()
and
pyarrow.parquet.read_table()
:
table_feather = pa.feather.read_table("compressed.feather")
print(table_feather)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
table_parquet = pa.parquet.read_table("compressed.parquet")
print(table_parquet)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
Reading data from formats that don’t have native support for
compression instead involves decompressing them before decoding them.
This can be done using the pyarrow.CompressedInputStream
class
which wraps files with a decompress operation before the result is
provided to the actual read function.
For example to read a compressed CSV file:
with pa.CompressedInputStream(pa.OSFile("compressed.csv.gz"), "gzip") as input:
table_csv = pa.csv.read_csv(input)
print(table_csv)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
Note
In the case of CSV, arrow is actually smart enough to try detecting
compressed files using the file extension. So if your file is named
*.gz
or *.bz2
the pyarrow.csv.read_csv()
function will
try to decompress it accordingly
table_csv2 = pa.csv.read_csv("compressed.csv.gz")
print(table_csv2)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]