Tabular Datasets

Warning

The pyarrow.dataset module is experimental (specifically the classes), and a stable API is not yet guaranteed.

The pyarrow.dataset module provides functionality to efficiently work with tabular, potentially larger than memory, and multi-file datasets. This includes:

  • A unified interface that supports different sources and file formats (Parquet, Feather / Arrow IPC, and CSV files) and different file systems (local, cloud).

  • Discovery of sources (crawling directories, handle directory-based partitioned datasets, basic schema normalization, ..)

  • Optimized reading with predicate pushdown (filtering rows), projection (selecting and deriving columns), and optionally parallel reading.

Currently, only Parquet, Feather / Arrow IPC, and CSV files are supported. The goal is to expand this in the future to other file formats and data sources (e.g. database connections).

For those familiar with the existing pyarrow.parquet.ParquetDataset for reading Parquet datasets: pyarrow.dataset’s goal is similar but not specific to the Parquet format and not tied to Python: the same datasets API is exposed in the R bindings or Arrow. In addition pyarrow.dataset boasts improved performance and new features (e.g. filtering within files rather than only on partition keys).

Reading Datasets

For the examples below, let’s create a small dataset consisting of a directory with two parquet files:

In [1]: import tempfile

In [2]: import pathlib

In [3]: import pyarrow as pa

In [4]: import pyarrow.parquet as pq

In [5]: base = pathlib.Path(tempfile.gettempdir())

In [6]: (base / "parquet_dataset").mkdir(exist_ok=True)

# creating an Arrow Table
In [7]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})

# writing it into two parquet files
In [8]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")

In [9]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

Dataset discovery

A Dataset object can be created with the dataset() function. We can pass it the path to the directory containing the data files:

In [10]: import pyarrow.dataset as ds

In [11]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [12]: dataset
Out[12]: <pyarrow._dataset.FileSystemDataset at 0x7f056e430870>

In addition to searching a base directory, dataset() accepts a path to a single file or a list of file paths.

Creating a Dataset object does not begin reading the data itself. If needed, it only crawls the directory to find all the files:

In [13]: dataset.files
Out[13]: ['/tmp/parquet_dataset/data1.parquet', '/tmp/parquet_dataset/data2.parquet']

… and infers the dataset’s schema (by default from the first file):

In [14]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

Using the Dataset.to_table() method we can read the dataset (or a portion of it) into a pyarrow Table (note that depending on the size of your dataset this can require a lot of memory, see below on filtering / iterative loading):

In [15]: dataset.to_table()
Out[15]: 
pyarrow.Table
a: int64
b: double
c: int64

# converting to pandas to see the contents of the scanned table
In [16]: dataset.to_table().to_pandas()
Out[16]: 
   a         b  c
0  0 -1.506306  1
1  1 -0.038217  2
2  2  0.562543  1
3  3 -1.778690  2
4  4  0.905818  1
5  5  0.277364  2
6  6  1.445442  1
7  7 -0.686318  2
8  8 -1.107535  1
9  9  0.286152  2

Reading different file formats

The above examples use Parquet files as dataset source but the Dataset API provides a consistent interface across multiple file formats and filesystems. Currently, Parquet, Feather / Arrow IPC, and CSV file formats are supported; more formats are planned in the future.

If we save the table as Feather files instead of Parquet files:

In [17]: import pyarrow.feather as feather

In [18]: feather.write_feather(table, base / "data.feather")

…then we can read the Feather file using the same functions, but with specifying format="feather":

In [19]: dataset = ds.dataset(base / "data.feather", format="feather")

In [20]: dataset.to_table().to_pandas().head()
Out[20]: 
   a         b  c
0  0 -1.506306  1
1  1 -0.038217  2
2  2  0.562543  1
3  3 -1.778690  2
4  4  0.905818  1

Customizing file formats

The format name as a string, like:

ds.dataset(..., format="parquet")

is short hand for a default constructed ParquetFileFormat:

ds.dataset(..., format=ds.ParquetFileForma())

The FileFormat objects can be customized using keywords. For example:

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

Will configure column "a" to be dictionary encoded on scan.

Filtering data

To avoid reading all data when only needing a subset, the columns and filter keywords can be used.

The columns keyword can be used to only read the specified columns:

In [21]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [22]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[22]: 
   a         b
0  0 -1.506306
1  1 -0.038217
2  2  0.562543
3  3 -1.778690
4  4  0.905818
5  5  0.277364
6  6  1.445442
7  7 -0.686318
8  8 -1.107535
9  9  0.286152

With the filter keyword, rows which do not match the filter predicate will not be included in the returned table. The keyword expects a boolean Expression referencing at least one of the columns:

In [23]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[23]: 
   a         b  c
0  7 -0.686318  2
1  8 -1.107535  1
2  9  0.286152  2

In [24]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[24]: 
   a         b  c
0  1 -0.038217  2
1  3 -1.778690  2
2  5  0.277364  2
3  7 -0.686318  2
4  9  0.286152  2

The easiest way to construct those Expression objects is by using the field() helper function. Any column - not just partition columns - can be referenced using the field() function (which creates a FieldExpression). Operator overloads are provided to compose filters including the comparisons (equal, larger/less than, etc), set membership testing, and boolean combinations (&, |, ~):

In [25]: ds.field('a') != 3
Out[25]: <pyarrow.dataset.Expression (a != 3)>

In [26]: ds.field('a').isin([1, 2, 3])
Out[26]: 
<pyarrow.dataset.Expression is_in(a, value_set=[
  1,
  2,
  3
], skip_nulls)>

In [27]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[27]: <pyarrow.dataset.Expression ((a > b) and (b > 1))>

Note that Expression objects can not be combined by python logical operators and, or and not.

Projecting columns

The columns keyword can be used to read a subset of the columns of the dataset by passing it a list of column names. The keyword can also be used for more complex projections in combination with expressions.

In this case, we pass it a dictionary with the keys being the resulting column names and the values the expression that is used to construct the column values:

In [28]: projection = {
   ....:     "a_renamed": ds.field("a"),
   ....:     "b_as_float32": ds.field("b").cast("float32"),
   ....:     "c_1": ds.field("c") == 1,
   ....: }
   ....: 

In [29]: dataset.to_table(columns=projection).to_pandas().head()
Out[29]: 
   a_renamed  b_as_float32    c_1
0          0     -1.506306   True
1          1     -0.038217  False
2          2      0.562543   True
3          3     -1.778690  False
4          4      0.905818   True

The dictionary also determines the column selection (only the keys in the dictionary will be present as columns in the resulting table). If you want to include a derived column in addition to the existing columns, you can build up the dictionary from the dataset schema:

In [30]: projection = {col: ds.field(col) for col in dataset.schema.names}

In [31]: projection.update({"b_large": ds.field("b") > 1})

In [32]: dataset.to_table(columns=projection).to_pandas().head()
Out[32]: 
   a         b  c  b_large
0  0 -1.506306  1    False
1  1 -0.038217  2    False
2  2  0.562543  1    False
3  3 -1.778690  2    False
4  4  0.905818  1    False

Reading partitioned data

Above, a dataset consisting of a flat directory with files was shown. However, a dataset can exploit a nested directory structure defining a partitioned dataset, where the sub-directory names hold information about which subset of the data is stored in that directory.

For example, a dataset partitioned by year and month may look like on disk:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

The above partitioning scheme is using “/key=value/” directory names, as found in Apache Hive.

Let’s create a small partitioned dataset. The write_to_dataset() function can write such hive-like partitioned datasets.

In [33]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
   ....:                   'part': ['a'] * 5 + ['b'] * 5})
   ....: 

In [34]: pq.write_to_dataset(table, str(base / "parquet_dataset_partitioned"),
   ....:                     partition_cols=['part'])
   ....: 

The above created a directory with two subdirectories (“part=a” and “part=b”), and the Parquet files written in those directories no longer include the “part” column.

Reading this dataset with dataset(), we now specify that the dataset should use a hive-like partitioning scheme with the partitioning keyword:

In [35]: dataset = ds.dataset(str(base / "parquet_dataset_partitioned"), format="parquet",
   ....:                      partitioning="hive")
   ....: 

In [36]: dataset.files
Out[36]: 
['/tmp/parquet_dataset_partitioned/part=a/7d62d28012b94681a6693e637b81ee45.parquet',
 '/tmp/parquet_dataset_partitioned/part=b/4faa10073c9d4fc3a4abf64ee6036810.parquet']

Although the partition fields are not included in the actual Parquet files, they will be added back to the resulting table when scanning this dataset:

In [37]: dataset.to_table().to_pandas().head(3)
Out[37]: 
   a         b  c part
0  0  1.005422  1    a
1  1 -0.995243  2    a
2  2 -0.214660  1    a

We can now filter on the partition keys, which avoids loading files altogether if they do not match the filter:

In [38]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[38]: 
   a         b  c part
0  5 -0.470175  2    b
1  6 -0.648736  1    b
2  7 -1.238260  2    b
3  8  0.207214  1    b
4  9  1.360630  2    b

Different partitioning schemes

The above example uses a hive-like directory scheme, such as “/year=2009/month=11/day=15”. We specified this passing the partitioning="hive" keyword. In this case, the types of the partition keys are inferred from the file paths.

It is also possible to explicitly define the schema of the partition keys using the partitioning() function. For example:

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

“Directory partitioning” is also supported, where the segments in the file path represent the values of the partition keys without including the name (the field name are implicit in the segment’s index). For example, given field names “year”, “month”, and “day”, one path might be “/2019/11/15”.

Since the names are not included in the file paths, these must be specified when constructing a directory partitioning:

part = ds.partitioning(field_names=["year", "month", "day"])

Directory partitioning also supports providing a full schema rather than inferring types from file paths.

Reading from cloud storage

In addition to local files, pyarrow also supports reading from cloud storage. Currently, HDFS and Amazon S3-compatible storage are supported.

When passing a file URI, the file system will be inferred. For example, specifying a S3 path:

dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"])

Typically, you will want to customize the connection parameters, and then a file system object can be created and passed to the filesystem keyword:

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=s3,
                     partitioning=["year", "month"])

The currently available classes are S3FileSystem and HadoopFileSystem. See the Filesystem Interface docs for more details.

Reading from Minio

In addition to cloud storage, pyarrow also supports reading from a MinIO object storage instance emulating S3 APIs. Paired with toxiproxy, this is useful for testing or benchmarking.

from pyarrow import fs

# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint="localhost:9000")
dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=minio,
                     partitioning=["year", "month"])

Working with Parquet Datasets

While the Datasets API provides a unified interface to different file formats, some specific methods exist for Parquet Datasets.

Some processing frameworks such as Dask (optionally) use a _metadata file with partitioned datasets which includes information about the schema and the row group metadata of the full dataset. Using such file can give a more efficient creation of a parquet Dataset, since it does not need to infer the schema and crawl the directories for all Parquet files (this is especially the case for filesystems where accessing files is expensive). The parquet_dataset() function allows to create a Dataset from a partitioned dataset with a _metadata file:

dataset = ds.parquet_dataset("/path/to/dir/_metadata")

By default, the constructed Dataset object for Parquet datasets maps each fragment to a single Parquet file. If you want fragments mapping to each row group of a Parquet file, you can use the split_by_row_group() method of the fragments:

fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()

This method returns a list of new Fragments mapping to each row group of the original Fragment (Parquet file). Both get_fragments() and split_by_row_group() accept an optional filter expression to get a filtered list of fragments.

Manual specification of the Dataset

The dataset() function allows easy creation of a Dataset viewing a directory, crawling all subdirectories for files and partitioning information. However sometimes discovery is not required and the dataset’s files and partitions are already known (for example, when this information is stored in metadata). In this case it is possible to create a Dataset explicitly without any automatic discovery or inference.

For the example here, we are going to use a dataset where the file names contain additional partitioning information:

# creating a dummy dataset: directory with two files
In [39]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

In [40]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)

In [41]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

To create a Dataset from a list of files, we need to specify the paths, schema, format, filesystem, and partition expressions manually:

In [43]: from pyarrow import fs

In [44]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

In [45]: dataset = ds.FileSystemDataset.from_paths(
   ....:     ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
   ....:     filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
   ....:     partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
   ....: 

Since we specified the “partition expressions” for our files, this information is materialized as columns when reading the data and can be used for filtering:

In [46]: dataset.to_table().to_pandas()
Out[46]: 
   year  col1      col2
0  2018     0  0.456925
1  2018     1  0.553757
2  2018     2 -0.105702
3  2019     0  0.456925
4  2019     1  0.553757
5  2019     2 -0.105702

In [47]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[47]: 
   year  col1      col2
0  2019     0  0.456925
1  2019     1  0.553757
2  2019     2 -0.105702

Manual scheduling

The to_table() method loads all selected data into memory at once resulting in a pyarrow Table. Alternatively, a dataset can also be scanned one RecordBatch at a time in an iterative manner using the scan() method:

for scan_task in dataset.scan(columns=[...], filter=...):
    for record_batch in scan_task.execute():
        # process the record batch