pyarrow.dataset.Dataset#
- class pyarrow.dataset.Dataset#
Bases:
_Weakrefable
Collection of data fragments and potentially child datasets.
Arrow Datasets allow you to query against data that has been split across multiple files. This sharding of data may indicate partitioning, which can accelerate queries that only touch some partitions (files).
- __init__(*args, **kwargs)#
Methods
__init__
(*args, **kwargs)count_rows
(self, Expression filter=None, ...)Count rows matching the scanner filter.
filter
(self, expression)Apply a row filter to the dataset.
get_fragments
(self, Expression filter=None)Returns an iterator over the fragments in this dataset.
head
(self, int num_rows[, columns])Load the first N rows of the dataset.
join
(self, right_dataset, keys[, ...])Perform a join between this dataset and another one.
join_asof
(self, right_dataset, on, by, tolerance)Perform an asof join between this dataset and another one.
replace_schema
(self, Schema schema)Return a copy of this Dataset with a different schema.
scanner
(self[, columns, filter])Build a scan operation against the dataset.
sort_by
(self, sorting, **kwargs)Sort the Dataset by one or multiple columns.
take
(self, indices[, columns])Select rows of data by index.
to_batches
(self[, columns])Read the dataset as materialized record batches.
to_table
(self[, columns])Read the dataset to an Arrow table.
Attributes
An Expression which evaluates to true for all data viewed by this Dataset.
The common schema of the full Dataset
- count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Count rows matching the scanner filter.
- Parameters:
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- filter
- Returns:
- count
int
- count
- filter(self, expression)#
Apply a row filter to the dataset.
- Parameters:
- expression
Expression
The filter that should be applied to the dataset.
- expression
- Returns:
- get_fragments(self, Expression filter=None)#
Returns an iterator over the fragments in this dataset.
- Parameters:
- filter
Expression
, defaultNone
Return fragments matching the optional filter, either using the partition_expression or internal information like Parquet’s statistics.
- filter
- Returns:
- fragmentsiterator of
Fragment
- fragmentsiterator of
- head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Load the first N rows of the dataset.
- Parameters:
- num_rows
int
The number of rows to load.
- columns
list
ofstr
, defaultNone
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- num_rows
- Returns:
- table
Table
- table
- join(self, right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)#
Perform a join between this dataset and another one.
Result of the join will be a new dataset, where further operations can be applied.
- Parameters:
- right_datasetdataset
The dataset to join to the current one, acting as the right dataset in the join operation.
- keys
str
orlist
[str
] The columns from current dataset that should be used as keys of the join operation left side.
- right_keys
str
orlist
[str
], defaultNone
The columns from the right_dataset that should be used as keys on the join operation right side. When
None
use the same key names as the left dataset.- join_type
str
, default “left outer” The kind of join that should be performed, one of (“left semi”, “right semi”, “left anti”, “right anti”, “inner”, “left outer”, “right outer”, “full outer”)
- left_suffix
str
, defaultNone
Which suffix to add to right column names. This prevents confusion when the columns in left and right datasets have colliding names.
- right_suffix
str
, defaultNone
Which suffix to add to the left column names. This prevents confusion when the columns in left and right datasets have colliding names.
- coalesce_keysbool, default
True
If the duplicated keys should be omitted from one of the sides in the join result.
- use_threadsbool, default
True
Whenever to use multithreading or not.
- Returns:
- join_asof(self, right_dataset, on, by, tolerance, right_on=None, right_by=None)#
Perform an asof join between this dataset and another one.
This is similar to a left-join except that we match on nearest key rather than equal keys. Both datasets must be sorted by the key. This type of join is most useful for time series data that are not perfectly aligned.
Optionally match on equivalent keys with “by” before searching with “on”.
Result of the join will be a new Dataset, where further operations can be applied.
- Parameters:
- right_datasetdataset
The dataset to join to the current one, acting as the right dataset in the join operation.
- on
str
The column from current dataset that should be used as the “on” key of the join operation left side.
An inexact match is used on the “on” key, i.e. a row is considered a match if and only if left_on - tolerance <= right_on <= left_on.
The input table must be sorted by the “on” key. Must be a single field of a common type.
Currently, the “on” key must be an integer, date, or timestamp type.
- by
str
orlist
[str
] The columns from current dataset that should be used as the keys of the join operation left side. The join operation is then done only for the matches in these columns.
- tolerance
int
The tolerance for inexact “on” key matching. A right row is considered a match with the left row right.on - left.on <= tolerance. The tolerance may be:
negative, in which case a past-as-of-join occurs;
or positive, in which case a future-as-of-join occurs;
or zero, in which case an exact-as-of-join occurs.
The tolerance is interpreted in the same units as the “on” key.
- right_on
str
orlist
[str
], defaultNone
The columns from the right_dataset that should be used as the on key on the join operation right side. When
None
use the same key name as the left dataset.- right_by
str
orlist
[str
], defaultNone
The columns from the right_dataset that should be used as by keys on the join operation right side. When
None
use the same key names as the left dataset.
- Returns:
- partition_expression#
An Expression which evaluates to true for all data viewed by this Dataset.
- replace_schema(self, Schema schema)#
Return a copy of this Dataset with a different schema.
The copy will view the same Fragments. If the new schema is not compatible with the original dataset’s schema then an error will be raised.
- Parameters:
- schema
Schema
The new dataset schema.
- schema
- scanner(self, columns=None, filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Build a scan operation against the dataset.
Data is not loaded immediately. Instead, this produces a Scanner, which exposes further operations (e.g. loading all data as a table, counting rows).
See the
Scanner.from_dataset()
method for further information.- Parameters:
- columns
list
ofstr
, defaultNone
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- columns
- Returns:
- scanner
Scanner
- scanner
Examples
>>> import pyarrow as pa >>> table = pa.table({'year': [2020, 2022, 2021, 2022, 2019, 2021], ... 'n_legs': [2, 2, 4, 4, 5, 100], ... 'animal': ["Flamingo", "Parrot", "Dog", "Horse", ... "Brittle stars", "Centipede"]}) >>> >>> import pyarrow.parquet as pq >>> pq.write_table(table, "dataset_scanner.parquet")
>>> import pyarrow.dataset as ds >>> dataset = ds.dataset("dataset_scanner.parquet")
Selecting a subset of the columns:
>>> dataset.scanner(columns=["year", "n_legs"]).to_table() pyarrow.Table year: int64 n_legs: int64 ---- year: [[2020,2022,2021,2022,2019,2021]] n_legs: [[2,2,4,4,5,100]]
Projecting selected columns using an expression:
>>> dataset.scanner(columns={ ... "n_legs_uint": ds.field("n_legs").cast("uint8"), ... }).to_table() pyarrow.Table n_legs_uint: uint8 ---- n_legs_uint: [[2,2,4,4,5,100]]
Filtering rows while scanning:
>>> dataset.scanner(filter=ds.field("year") > 2020).to_table() pyarrow.Table year: int64 n_legs: int64 animal: string ---- year: [[2022,2021,2022,2021]] n_legs: [[2,4,4,100]] animal: [["Parrot","Dog","Horse","Centipede"]]
- schema#
The common schema of the full Dataset
- sort_by(self, sorting, **kwargs)#
Sort the Dataset by one or multiple columns.
- Parameters:
- Returns:
InMemoryDataset
A new dataset sorted according to the sort keys.
- take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Select rows of data by index.
- Parameters:
- indices
Array
orarray-like
indices of rows to select in the dataset.
- columns
list
ofstr
, defaultNone
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- indices
- Returns:
- table
Table
- table
- to_batches(self, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Read the dataset as materialized record batches.
- Parameters:
- columns
list
ofstr
, defaultNone
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- columns
- Returns:
- record_batchesiterator of
RecordBatch
- record_batchesiterator of
- to_table(self, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)#
Read the dataset to an Arrow table.
Note that this method reads all the selected data from the dataset into memory.
- Parameters:
- columns
list
ofstr
, defaultNone
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
- filter
Expression
, defaultNone
Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
- batch_size
int
, default 131_072 The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
- batch_readahead
int
, default 16 The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_readahead
int
, default 4 The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
- fragment_scan_options
FragmentScanOptions
, defaultNone
Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
- use_threadsbool, default
True
If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
- memory_pool
MemoryPool
, defaultNone
For memory allocations, if required. If not specified, uses the default pool.
- columns
- Returns:
- table
Table
- table