pyarrow.dataset.Dataset#

class pyarrow.dataset.Dataset#

Bases: _Weakrefable

Collection of data fragments and potentially child datasets.

Arrow Datasets allow you to query against data that has been split across multiple files. This sharding of data may indicate partitioning, which can accelerate queries that only touch some partitions (files).

__init__(*args, **kwargs)#

Methods

__init__(*args, **kwargs)

count_rows(self, Expression filter=None, ...)

Count rows matching the scanner filter.

filter(self, expression)

Apply a row filter to the dataset.

get_fragments(self, Expression filter=None)

Returns an iterator over the fragments in this dataset.

head(self, int num_rows[, columns])

Load the first N rows of the dataset.

join(self, right_dataset, keys[, ...])

Perform a join between this dataset and another one.

join_asof(self, right_dataset, on, by, tolerance)

Perform an asof join between this dataset and another one.

replace_schema(self, Schema schema)

Return a copy of this Dataset with a different schema.

scanner(self[, columns, filter])

Build a scan operation against the dataset.

sort_by(self, sorting, **kwargs)

Sort the Dataset by one or multiple columns.

take(self, indices[, columns])

Select rows of data by index.

to_batches(self[, columns])

Read the dataset as materialized record batches.

to_table(self[, columns])

Read the dataset to an Arrow table.

Attributes

partition_expression

An Expression which evaluates to true for all data viewed by this Dataset.

schema

The common schema of the full Dataset

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Count rows matching the scanner filter.

Parameters:
filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
countint
filter(self, expression)#

Apply a row filter to the dataset.

Parameters:
expressionExpression

The filter that should be applied to the dataset.

Returns:
Dataset
get_fragments(self, Expression filter=None)#

Returns an iterator over the fragments in this dataset.

Parameters:
filterExpression, default None

Return fragments matching the optional filter, either using the partition_expression or internal information like Parquet’s statistics.

Returns:
fragmentsiterator of Fragment
head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Load the first N rows of the dataset.

Parameters:
num_rowsint

The number of rows to load.

columnslist of str, default None

The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
tableTable
join(self, right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)#

Perform a join between this dataset and another one.

Result of the join will be a new dataset, where further operations can be applied.

Parameters:
right_datasetdataset

The dataset to join to the current one, acting as the right dataset in the join operation.

keysstr or list[str]

The columns from current dataset that should be used as keys of the join operation left side.

right_keysstr or list[str], default None

The columns from the right_dataset that should be used as keys on the join operation right side. When None use the same key names as the left dataset.

join_typestr, default “left outer”

The kind of join that should be performed, one of (“left semi”, “right semi”, “left anti”, “right anti”, “inner”, “left outer”, “right outer”, “full outer”)

left_suffixstr, default None

Which suffix to add to right column names. This prevents confusion when the columns in left and right datasets have colliding names.

right_suffixstr, default None

Which suffix to add to the left column names. This prevents confusion when the columns in left and right datasets have colliding names.

coalesce_keysbool, default True

If the duplicated keys should be omitted from one of the sides in the join result.

use_threadsbool, default True

Whenever to use multithreading or not.

Returns:
InMemoryDataset
join_asof(self, right_dataset, on, by, tolerance, right_on=None, right_by=None)#

Perform an asof join between this dataset and another one.

This is similar to a left-join except that we match on nearest key rather than equal keys. Both datasets must be sorted by the key. This type of join is most useful for time series data that are not perfectly aligned.

Optionally match on equivalent keys with “by” before searching with “on”.

Result of the join will be a new Dataset, where further operations can be applied.

Parameters:
right_datasetdataset

The dataset to join to the current one, acting as the right dataset in the join operation.

onstr

The column from current dataset that should be used as the “on” key of the join operation left side.

An inexact match is used on the “on” key, i.e. a row is considered a match if and only if left_on - tolerance <= right_on <= left_on.

The input table must be sorted by the “on” key. Must be a single field of a common type.

Currently, the “on” key must be an integer, date, or timestamp type.

bystr or list[str]

The columns from current dataset that should be used as the keys of the join operation left side. The join operation is then done only for the matches in these columns.

toleranceint

The tolerance for inexact “on” key matching. A right row is considered a match with the left row right.on - left.on <= tolerance. The tolerance may be:

  • negative, in which case a past-as-of-join occurs;

  • or positive, in which case a future-as-of-join occurs;

  • or zero, in which case an exact-as-of-join occurs.

The tolerance is interpreted in the same units as the “on” key.

right_onstr or list[str], default None

The columns from the right_dataset that should be used as the on key on the join operation right side. When None use the same key name as the left dataset.

right_bystr or list[str], default None

The columns from the right_dataset that should be used as by keys on the join operation right side. When None use the same key names as the left dataset.

Returns:
InMemoryDataset
partition_expression#

An Expression which evaluates to true for all data viewed by this Dataset.

replace_schema(self, Schema schema)#

Return a copy of this Dataset with a different schema.

The copy will view the same Fragments. If the new schema is not compatible with the original dataset’s schema then an error will be raised.

Parameters:
schemaSchema

The new dataset schema.

scanner(self, columns=None, filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Build a scan operation against the dataset.

Data is not loaded immediately. Instead, this produces a Scanner, which exposes further operations (e.g. loading all data as a table, counting rows).

See the Scanner.from_dataset() method for further information.

Parameters:
columnslist of str, default None

The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
scannerScanner

Examples

>>> import pyarrow as pa
>>> table = pa.table({'year': [2020, 2022, 2021, 2022, 2019, 2021],
...                   'n_legs': [2, 2, 4, 4, 5, 100],
...                   'animal': ["Flamingo", "Parrot", "Dog", "Horse",
...                              "Brittle stars", "Centipede"]})
>>>
>>> import pyarrow.parquet as pq
>>> pq.write_table(table, "dataset_scanner.parquet")
>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("dataset_scanner.parquet")

Selecting a subset of the columns:

>>> dataset.scanner(columns=["year", "n_legs"]).to_table()
pyarrow.Table
year: int64
n_legs: int64
----
year: [[2020,2022,2021,2022,2019,2021]]
n_legs: [[2,2,4,4,5,100]]

Projecting selected columns using an expression:

>>> dataset.scanner(columns={
...     "n_legs_uint": ds.field("n_legs").cast("uint8"),
... }).to_table()
pyarrow.Table
n_legs_uint: uint8
----
n_legs_uint: [[2,2,4,4,5,100]]

Filtering rows while scanning:

>>> dataset.scanner(filter=ds.field("year") > 2020).to_table()
pyarrow.Table
year: int64
n_legs: int64
animal: string
----
year: [[2022,2021,2022,2021]]
n_legs: [[2,4,4,100]]
animal: [["Parrot","Dog","Horse","Centipede"]]
schema#

The common schema of the full Dataset

sort_by(self, sorting, **kwargs)#

Sort the Dataset by one or multiple columns.

Parameters:
sortingstr or list[tuple(name, order)]

Name of the column to use to sort (ascending), or a list of multiple sorting conditions where each entry is a tuple with column name and sorting order (“ascending” or “descending”)

**kwargsdict, optional

Additional sorting options. As allowed by SortOptions

Returns:
InMemoryDataset

A new dataset sorted according to the sort keys.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Select rows of data by index.

Parameters:
indicesArray or array-like

indices of rows to select in the dataset.

columnslist of str, default None

The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
tableTable
to_batches(self, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Read the dataset as materialized record batches.

Parameters:
columnslist of str, default None

The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
record_batchesiterator of RecordBatch
to_table(self, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#

Read the dataset to an Arrow table.

Note that this method reads all the selected data from the dataset into memory.

Parameters:
columnslist of str, default None

The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

filterExpression, default None

Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

batch_sizeint, default 131_072

The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

batch_readaheadint, default 16

The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_readaheadint, default 4

The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

fragment_scan_optionsFragmentScanOptions, default None

Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

use_threadsbool, default True

If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

memory_poolMemoryPool, default None

For memory allocations, if required. If not specified, uses the default pool.

Returns:
tableTable