pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;Expand description
A builder for reading parquet files from an async source as ParquetRecordBatchStream
This can be used to decode a Parquet file in streaming fashion (without downloading the whole file at once) from a remote source, such as an object store.
This builder handles reading the parquet file metadata, allowing consumers to use this information to select what specific columns, row groups, etc. they wish to be read by the resulting stream.
See examples on ParquetRecordBatchStreamBuilder::new, including how to
issue multiple I/O requests in parallel using multiple streams.
§See also:
ParquetPushDecoderBuilderfor lower level control over buffering and decoding.ParquetRecordBatchStream::next_row_groupfor I/O prefetching
See ArrowReaderBuilder for additional member functions
Aliased Type§
pub struct ParquetRecordBatchStreamBuilder<T> {Show 14 fields
pub(crate) input: AsyncReader<T>,
pub(crate) metadata: Arc<ParquetMetaData>,
pub(crate) schema: Arc<Schema>,
pub(crate) fields: Option<Arc<ParquetField>>,
pub(crate) batch_size: usize,
pub(crate) row_groups: Option<Vec<usize>>,
pub(crate) projection: ProjectionMask,
pub(crate) filter: Option<RowFilter>,
pub(crate) selection: Option<RowSelection>,
pub(crate) row_selection_policy: RowSelectionPolicy,
pub(crate) limit: Option<usize>,
pub(crate) offset: Option<usize>,
pub(crate) metrics: ArrowReaderMetrics,
pub(crate) max_predicate_cache_size: usize,
}Fields§
§input: AsyncReader<T>The “input” to read parquet data from.
Note in the case of the ParquetPushDecoderBuilder there is no
underlying reader; the input is instead PushDecoderInput, the buffer that
caller-pushed bytes accumulate in.
metadata: Arc<ParquetMetaData>§schema: Arc<Schema>§fields: Option<Arc<ParquetField>>§batch_size: usize§row_groups: Option<Vec<usize>>§projection: ProjectionMask§filter: Option<RowFilter>§selection: Option<RowSelection>§row_selection_policy: RowSelectionPolicy§limit: Option<usize>§offset: Option<usize>§metrics: ArrowReaderMetrics§max_predicate_cache_size: usizeImplementations§
Source§impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T>
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T>
Sourcepub async fn new(input: T) -> Result<Self>
pub async fn new(input: T) -> Result<Self>
Create a new ParquetRecordBatchStreamBuilder for reading from the
specified source.
§Examples:
- Basic example reading from an async source
- Configuring options and reading metadata
- Reading Row Groups in Parallel
§Example
// Use tokio::fs::File to read data using an async I/O. This can be replaced with
// another async I/O reader such as a reader from an object store.
let file = tokio::fs::File::open(path).await.unwrap();
// Configure options for reading from the async source
let builder = ParquetRecordBatchStreamBuilder::new(file)
.await
.unwrap();
// Building the stream opens the parquet file (reads metadata, etc) and returns
// a stream that can be used to incrementally read the data in batches
let stream = builder.build().unwrap();
// In this example, we collect the stream into a Vec<RecordBatch>
// but real applications would likely process the batches as they are read
let results = stream.try_collect::<Vec<_>>().await.unwrap();
// Demonstrate the results are as expected
assert_batches_eq(
&results,
&[
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |",
"| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |",
"| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |",
"| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |",
"| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |",
"| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |",
"| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |",
"| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
],
);§Example Configuring Options and Reading Metadata
There are many options that control the behavior of the reader, such as
with_batch_size, with_projection, with_filter, etc…
// As before, use tokio::fs::File to read data using an async I/O.
let file = tokio::fs::File::open(path).await.unwrap();
// Configure options for reading from the async source, in this case we set the batch size
// to 3 which produces 3 rows at a time.
let builder = ParquetRecordBatchStreamBuilder::new(file)
.await
.unwrap()
.with_batch_size(3);
// We can also read the metadata to inspect the schema and other metadata
// before actually reading the data
let file_metadata = builder.metadata().file_metadata();
// Specify that we only want to read the 1st, 2nd, and 6th columns
let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
let stream = builder.with_projection(mask).build().unwrap();
let results = stream.try_collect::<Vec<_>>().await.unwrap();
// Print out the results
assert_batches_eq(
&results,
&[
"+----------+-------------+-----------+",
"| bool_col | tinyint_col | float_col |",
"+----------+-------------+-----------+",
"| true | 0 | 0.0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0.0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0.0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0.0 |",
"| false | 1 | 1.1 |",
"+----------+-------------+-----------+",
],
);
// The results has 8 rows, so since we set the batch size to 3, we expect
// 3 batches, two with 3 rows each and the last batch with 2 rows.
assert_eq!(results.len(), 3);§Example reading Row Groups in Parallel
Each ParquetRecordBatchStream is independent and can be used to read
from the same underlying source in parallel. Use
ParquetRecordBatchStream::next_row_group with a single stream to
begin prefetching the next Row Group. To read a file in parallel, create
a stream for each subset of the file. For example, you can read each
row group in parallel by creating a stream for each row group using the
ParquetRecordBatchStreamBuilder::with_row_groups API as shown below
// This example uses a tokio::fs::File as the async source, but it
// could be any async source such as an object store reader)
let mut file = tokio::fs::File::open(path).await?;
// To read Row Groups in parallel, create a separate stream builder for each Row Group.
// First get the metadata to find the row group information
let file_size = file.metadata().await?.len();
let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, file_size).await?;
assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups with 5 rows each
// Create a stream reader for each row group
let reader_metadata = ArrowReaderMetadata::try_new(
Arc::new(metadata),
ArrowReaderOptions::new()
)?;
let mut streams = vec![];
for row_group_index in 0..10 {
// Each stream needs its own source instance to issue
// parallel IO requests, so clone the file for each stream
let this_file = file.try_clone().await?;
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
this_file,
reader_metadata.clone()
)
.with_row_groups(vec![row_group_index]) // read only this row group
.build()?;
streams.push(stream);
}
// Each reader can now be polled independently and in parallel, for
// example using StreamExt::buffered to read from 3 at a time
let results = futures::stream::iter(streams)
.map(|stream| async move { stream })
.buffered(3)
.flatten()
.try_collect::<Vec<_>>().await?;
// read all 50 rows (10 row groups x 5 rows per group)
assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());Sourcepub async fn new_with_options(
input: T,
options: ArrowReaderOptions,
) -> Result<Self>
pub async fn new_with_options( input: T, options: ArrowReaderOptions, ) -> Result<Self>
Create a new ParquetRecordBatchStreamBuilder with the provided async source
and ArrowReaderOptions.
Sourcepub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self
Create a ParquetRecordBatchStreamBuilder from the provided ArrowReaderMetadata
This allows loading metadata once and using it to create multiple builders with potentially different settings, that can be read in parallel.
§Example of reading from multiple streams in parallel
// open file with parquet data
let mut file = tokio::fs::File::from_std(file);
// load metadata once
let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
// create two readers, a and b, from the same underlying file
// without reading the metadata again
let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await.unwrap(),
meta.clone()
).build().unwrap();
let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
// Can read batches from both readers in parallel
assert_eq!(
a.next().await.unwrap().unwrap(),
b.next().await.unwrap().unwrap(),
);Sourcepub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>>
pub async fn get_row_group_column_bloom_filter( &mut self, row_group_idx: usize, column_idx: usize, ) -> Result<Option<Sbbf>>
Read bloom filter for a column in a row group
Returns None if the column does not have a bloom filter
We should call this function after other forms pruning, such as projection and predicate pushdown.
Sourcepub fn build(self) -> Result<ParquetRecordBatchStream<T>>
pub fn build(self) -> Result<ParquetRecordBatchStream<T>>
Build a new ParquetRecordBatchStream
See examples on ParquetRecordBatchStreamBuilder::new