Skip to main content

ParquetRecordBatchStreamBuilder

Type Alias ParquetRecordBatchStreamBuilder 

Source
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
Expand description

A builder for reading parquet files from an async source as ParquetRecordBatchStream

This can be used to decode a Parquet file in streaming fashion (without downloading the whole file at once) from a remote source, such as an object store.

This builder handles reading the parquet file metadata, allowing consumers to use this information to select what specific columns, row groups, etc. they wish to be read by the resulting stream.

See examples on ParquetRecordBatchStreamBuilder::new, including how to issue multiple I/O requests in parallel using multiple streams.

§See also:

See ArrowReaderBuilder for additional member functions

Aliased Type§

pub struct ParquetRecordBatchStreamBuilder<T> {
Show 14 fields pub(crate) input: AsyncReader<T>, pub(crate) metadata: Arc<ParquetMetaData>, pub(crate) schema: Arc<Schema>, pub(crate) fields: Option<Arc<ParquetField>>, pub(crate) batch_size: usize, pub(crate) row_groups: Option<Vec<usize>>, pub(crate) projection: ProjectionMask, pub(crate) filter: Option<RowFilter>, pub(crate) selection: Option<RowSelection>, pub(crate) row_selection_policy: RowSelectionPolicy, pub(crate) limit: Option<usize>, pub(crate) offset: Option<usize>, pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize,
}

Fields§

§input: AsyncReader<T>

The “input” to read parquet data from.

Note in the case of the ParquetPushDecoderBuilder there is no underlying reader; the input is instead PushDecoderInput, the buffer that caller-pushed bytes accumulate in.

§metadata: Arc<ParquetMetaData>§schema: Arc<Schema>§fields: Option<Arc<ParquetField>>§batch_size: usize§row_groups: Option<Vec<usize>>§projection: ProjectionMask§filter: Option<RowFilter>§selection: Option<RowSelection>§row_selection_policy: RowSelectionPolicy§limit: Option<usize>§offset: Option<usize>§metrics: ArrowReaderMetrics§max_predicate_cache_size: usize

Implementations§

Source§

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T>

Source

pub async fn new(input: T) -> Result<Self>

Create a new ParquetRecordBatchStreamBuilder for reading from the specified source.

§Examples:
§Example
// Use tokio::fs::File to read data using an async I/O. This can be replaced with
// another async I/O reader such as a reader from an object store.
let file = tokio::fs::File::open(path).await.unwrap();

// Configure options for reading from the async source
let builder = ParquetRecordBatchStreamBuilder::new(file)
    .await
    .unwrap();
// Building the stream opens the parquet file (reads metadata, etc) and returns
// a stream that can be used to incrementally read the data in batches
let stream = builder.build().unwrap();
// In this example, we collect the stream into a Vec<RecordBatch>
// but real applications would likely process the batches as they are read
let results = stream.try_collect::<Vec<_>>().await.unwrap();
// Demonstrate the results are as expected
assert_batches_eq(
    &results,
    &[
      "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
      "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
      "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
      "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
      "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
      "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
      "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
      "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
      "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
      "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
      "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
      "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
     ],
 );
§Example Configuring Options and Reading Metadata

There are many options that control the behavior of the reader, such as with_batch_size, with_projection, with_filter, etc…

// As before, use tokio::fs::File to read data using an async I/O.
let file = tokio::fs::File::open(path).await.unwrap();

// Configure options for reading from the async source, in this case we set the batch size
// to 3 which produces 3 rows at a time.
let builder = ParquetRecordBatchStreamBuilder::new(file)
    .await
    .unwrap()
    .with_batch_size(3);

// We can also read the metadata to inspect the schema and other metadata
// before actually reading the data
let file_metadata = builder.metadata().file_metadata();
// Specify that we only want to read the 1st, 2nd, and 6th columns
let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);

let stream = builder.with_projection(mask).build().unwrap();
let results = stream.try_collect::<Vec<_>>().await.unwrap();
// Print out the results
assert_batches_eq(
    &results,
    &[
        "+----------+-------------+-----------+",
        "| bool_col | tinyint_col | float_col |",
        "+----------+-------------+-----------+",
        "| true     | 0           | 0.0       |",
        "| false    | 1           | 1.1       |",
        "| true     | 0           | 0.0       |",
        "| false    | 1           | 1.1       |",
        "| true     | 0           | 0.0       |",
        "| false    | 1           | 1.1       |",
        "| true     | 0           | 0.0       |",
        "| false    | 1           | 1.1       |",
        "+----------+-------------+-----------+",
     ],
 );

// The results has 8 rows, so since we set the batch size to 3, we expect
// 3 batches, two with 3 rows each and the last batch with 2 rows.
assert_eq!(results.len(), 3);
§Example reading Row Groups in Parallel

Each ParquetRecordBatchStream is independent and can be used to read from the same underlying source in parallel. Use ParquetRecordBatchStream::next_row_group with a single stream to begin prefetching the next Row Group. To read a file in parallel, create a stream for each subset of the file. For example, you can read each row group in parallel by creating a stream for each row group using the ParquetRecordBatchStreamBuilder::with_row_groups API as shown below

// This example uses a tokio::fs::File as the async source, but it
// could be any async source such as an object store reader)
let mut file = tokio::fs::File::open(path).await?;
// To read Row Groups in parallel, create a separate stream builder for each Row Group.
// First get the metadata to find the row group information
let file_size = file.metadata().await?.len();
let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, file_size).await?;
assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups with 5 rows each
// Create a stream reader for each row group
let reader_metadata = ArrowReaderMetadata::try_new(
  Arc::new(metadata),
  ArrowReaderOptions::new()
)?;
let mut streams = vec![];
 for row_group_index in 0..10 {
  // Each stream needs its own source instance to issue
  // parallel IO requests, so clone the file for each stream
  let this_file = file.try_clone().await?;
  let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
       this_file,
       reader_metadata.clone()
     )
     .with_row_groups(vec![row_group_index]) // read only this row group
     .build()?;
    streams.push(stream);
}
// Each reader can now be polled independently and in parallel, for
// example using StreamExt::buffered to read from 3 at a time
let results = futures::stream::iter(streams)
 .map(|stream| async move { stream })
 .buffered(3)
 .flatten()
 .try_collect::<Vec<_>>().await?;
// read all 50 rows (10 row groups x 5 rows per group)
assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());
Source

pub async fn new_with_options( input: T, options: ArrowReaderOptions, ) -> Result<Self>

Create a new ParquetRecordBatchStreamBuilder with the provided async source and ArrowReaderOptions.

Source

pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self

Create a ParquetRecordBatchStreamBuilder from the provided ArrowReaderMetadata

This allows loading metadata once and using it to create multiple builders with potentially different settings, that can be read in parallel.

§Example of reading from multiple streams in parallel
// open file with parquet data
let mut file = tokio::fs::File::from_std(file);
// load metadata once
let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
// create two readers, a and b, from the same underlying file
// without reading the metadata again
let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
    file.try_clone().await.unwrap(),
    meta.clone()
).build().unwrap();
let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();

// Can read batches from both readers in parallel
assert_eq!(
  a.next().await.unwrap().unwrap(),
  b.next().await.unwrap().unwrap(),
);
Source

pub async fn get_row_group_column_bloom_filter( &mut self, row_group_idx: usize, column_idx: usize, ) -> Result<Option<Sbbf>>

Read bloom filter for a column in a row group

Returns None if the column does not have a bloom filter

We should call this function after other forms pruning, such as projection and predicate pushdown.

Source

pub fn build(self) -> Result<ParquetRecordBatchStream<T>>