parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups};
42use crate::arrow::arrow_reader::{
43    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
44    RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::arrow_reader::ReadPlanBuilder;
65use crate::arrow::schema::ParquetField;
66#[cfg(feature = "object_store")]
67pub use store::*;
68
69/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
70///
71/// Notes:
72///
73/// 1. There is a default implementation for types that implement [`AsyncRead`]
74///    and [`AsyncSeek`], for example [`tokio::fs::File`].
75///
76/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
77///    is enabled, implements this interface for [`ObjectStore`].
78///
79/// [`ObjectStore`]: object_store::ObjectStore
80///
81/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
82pub trait AsyncFileReader: Send {
83    /// Retrieve the bytes in `range`
84    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
85
86    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
87    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
88        async move {
89            let mut result = Vec::with_capacity(ranges.len());
90
91            for range in ranges.into_iter() {
92                let data = self.get_bytes(range).await?;
93                result.push(data);
94            }
95
96            Ok(result)
97        }
98        .boxed()
99    }
100
101    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
102    ///
103    /// This is an asynchronous operation as it may involve reading the file
104    /// footer and potentially other metadata from disk or a remote source.
105    ///
106    /// Reading data from Parquet requires the metadata to understand the
107    /// schema, row groups, and location of pages within the file. This metadata
108    /// is stored primarily in the footer of the Parquet file, and can be read using
109    /// [`ParquetMetaDataReader`].
110    ///
111    /// However, implementations can significantly speed up reading Parquet by
112    /// supplying cached metadata or pre-fetched metadata via this API.
113    ///
114    /// # Parameters
115    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
116    ///   and other options that affect how the metadata is read.
117    fn get_metadata<'a>(
118        &'a mut self,
119        options: Option<&'a ArrowReaderOptions>,
120    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
121}
122
123/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
124impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
125    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
126        self.as_mut().get_bytes(range)
127    }
128
129    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
130        self.as_mut().get_byte_ranges(ranges)
131    }
132
133    fn get_metadata<'a>(
134        &'a mut self,
135        options: Option<&'a ArrowReaderOptions>,
136    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
137        self.as_mut().get_metadata(options)
138    }
139}
140
141impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
142    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
143        async move {
144            self.seek(SeekFrom::End(-(suffix as i64))).await?;
145            let mut buf = Vec::with_capacity(suffix);
146            self.take(suffix as _).read_to_end(&mut buf).await?;
147            Ok(buf.into())
148        }
149        .boxed()
150    }
151}
152
153impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
154    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
155        async move {
156            self.seek(SeekFrom::Start(range.start)).await?;
157
158            let to_read = range.end - range.start;
159            let mut buffer = Vec::with_capacity(to_read.try_into()?);
160            let read = self.take(to_read).read_to_end(&mut buffer).await?;
161            if read as u64 != to_read {
162                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
163            }
164
165            Ok(buffer.into())
166        }
167        .boxed()
168    }
169
170    fn get_metadata<'a>(
171        &'a mut self,
172        options: Option<&'a ArrowReaderOptions>,
173    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
174        async move {
175            let metadata_reader = ParquetMetaDataReader::new()
176                .with_page_indexes(options.is_some_and(|o| o.page_index));
177
178            #[cfg(feature = "encryption")]
179            let metadata_reader = metadata_reader.with_decryption_properties(
180                options.and_then(|o| o.file_decryption_properties.as_ref()),
181            );
182
183            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
184            Ok(Arc::new(parquet_metadata))
185        }
186        .boxed()
187    }
188}
189
190impl ArrowReaderMetadata {
191    /// Returns a new [`ArrowReaderMetadata`] for this builder
192    ///
193    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
194    pub async fn load_async<T: AsyncFileReader>(
195        input: &mut T,
196        options: ArrowReaderOptions,
197    ) -> Result<Self> {
198        let metadata = input.get_metadata(Some(&options)).await?;
199        Self::try_new(metadata, options)
200    }
201}
202
203#[doc(hidden)]
204/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
205///
206/// Allows sharing the same builder for both the sync and async versions, whilst also not
207/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
208pub struct AsyncReader<T>(T);
209
210/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
211///
212/// This can be used to decode a Parquet file in streaming fashion (without
213/// downloading the whole file at once) from a remote source, such as an object store.
214///
215/// This builder handles reading the parquet file metadata, allowing consumers
216/// to use this information to select what specific columns, row groups, etc.
217/// they wish to be read by the resulting stream.
218///
219/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
220///
221/// See [`ArrowReaderBuilder`] for additional member functions
222pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
223
224impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
225    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
226    /// specified source.
227    ///
228    /// # Example
229    /// ```
230    /// # #[tokio::main(flavor="current_thread")]
231    /// # async fn main() {
232    /// #
233    /// # use arrow_array::RecordBatch;
234    /// # use arrow::util::pretty::pretty_format_batches;
235    /// # use futures::TryStreamExt;
236    /// #
237    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
238    /// #
239    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
240    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
241    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
242    /// #     assert_eq!(
243    /// #          &actual_lines, expected_lines,
244    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
245    /// #          expected_lines, actual_lines
246    /// #      );
247    /// #  }
248    /// #
249    /// # let testdata = arrow::util::test_util::parquet_test_data();
250    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
251    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
252    /// // another async I/O reader such as a reader from an object store.
253    /// let file = tokio::fs::File::open(path).await.unwrap();
254    ///
255    /// // Configure options for reading from the async source
256    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
257    ///     .await
258    ///     .unwrap();
259    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
260    /// // a stream that can be used to incrementally read the data in batches
261    /// let stream = builder.build().unwrap();
262    /// // In this example, we collect the stream into a Vec<RecordBatch>
263    /// // but real applications would likely process the batches as they are read
264    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
265    /// // Demonstrate the results are as expected
266    /// assert_batches_eq(
267    ///     &results,
268    ///     &[
269    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
270    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
271    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
272    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
273    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
274    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
275    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
276    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
277    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
278    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
279    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
280    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
281    ///      ],
282    ///  );
283    /// # }
284    /// ```
285    ///
286    /// # Example configuring options and reading metadata
287    ///
288    /// There are many options that control the behavior of the reader, such as
289    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
290    ///
291    /// ```
292    /// # #[tokio::main(flavor="current_thread")]
293    /// # async fn main() {
294    /// #
295    /// # use arrow_array::RecordBatch;
296    /// # use arrow::util::pretty::pretty_format_batches;
297    /// # use futures::TryStreamExt;
298    /// #
299    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
300    /// #
301    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
302    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
303    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
304    /// #     assert_eq!(
305    /// #          &actual_lines, expected_lines,
306    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
307    /// #          expected_lines, actual_lines
308    /// #      );
309    /// #  }
310    /// #
311    /// # let testdata = arrow::util::test_util::parquet_test_data();
312    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
313    /// // As before, use tokio::fs::File to read data using an async I/O.
314    /// let file = tokio::fs::File::open(path).await.unwrap();
315    ///
316    /// // Configure options for reading from the async source, in this case we set the batch size
317    /// // to 3 which produces 3 rows at a time.
318    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
319    ///     .await
320    ///     .unwrap()
321    ///     .with_batch_size(3);
322    ///
323    /// // We can also read the metadata to inspect the schema and other metadata
324    /// // before actually reading the data
325    /// let file_metadata = builder.metadata().file_metadata();
326    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
327    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
328    ///
329    /// let stream = builder.with_projection(mask).build().unwrap();
330    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
331    /// // Print out the results
332    /// assert_batches_eq(
333    ///     &results,
334    ///     &[
335    ///         "+----------+-------------+-----------+",
336    ///         "| bool_col | tinyint_col | float_col |",
337    ///         "+----------+-------------+-----------+",
338    ///         "| true     | 0           | 0.0       |",
339    ///         "| false    | 1           | 1.1       |",
340    ///         "| true     | 0           | 0.0       |",
341    ///         "| false    | 1           | 1.1       |",
342    ///         "| true     | 0           | 0.0       |",
343    ///         "| false    | 1           | 1.1       |",
344    ///         "| true     | 0           | 0.0       |",
345    ///         "| false    | 1           | 1.1       |",
346    ///         "+----------+-------------+-----------+",
347    ///      ],
348    ///  );
349    ///
350    /// // The results has 8 rows, so since we set the batch size to 3, we expect
351    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
352    /// assert_eq!(results.len(), 3);
353    /// # }
354    /// ```
355    pub async fn new(input: T) -> Result<Self> {
356        Self::new_with_options(input, Default::default()).await
357    }
358
359    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
360    /// and [`ArrowReaderOptions`].
361    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
362        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
363        Ok(Self::new_with_metadata(input, metadata))
364    }
365
366    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
367    ///
368    /// This allows loading metadata once and using it to create multiple builders with
369    /// potentially different settings, that can be read in parallel.
370    ///
371    /// # Example of reading from multiple streams in parallel
372    ///
373    /// ```
374    /// # use std::fs::metadata;
375    /// # use std::sync::Arc;
376    /// # use bytes::Bytes;
377    /// # use arrow_array::{Int32Array, RecordBatch};
378    /// # use arrow_schema::{DataType, Field, Schema};
379    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
380    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
381    /// # use tempfile::tempfile;
382    /// # use futures::StreamExt;
383    /// # #[tokio::main(flavor="current_thread")]
384    /// # async fn main() {
385    /// #
386    /// # let mut file = tempfile().unwrap();
387    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
388    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
389    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
390    /// # writer.write(&batch).unwrap();
391    /// # writer.close().unwrap();
392    /// // open file with parquet data
393    /// let mut file = tokio::fs::File::from_std(file);
394    /// // load metadata once
395    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
396    /// // create two readers, a and b, from the same underlying file
397    /// // without reading the metadata again
398    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
399    ///     file.try_clone().await.unwrap(),
400    ///     meta.clone()
401    /// ).build().unwrap();
402    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
403    ///
404    /// // Can read batches from both readers in parallel
405    /// assert_eq!(
406    ///   a.next().await.unwrap().unwrap(),
407    ///   b.next().await.unwrap().unwrap(),
408    /// );
409    /// # }
410    /// ```
411    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
412        Self::new_builder(AsyncReader(input), metadata)
413    }
414
415    /// Read bloom filter for a column in a row group
416    ///
417    /// Returns `None` if the column does not have a bloom filter
418    ///
419    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
420    pub async fn get_row_group_column_bloom_filter(
421        &mut self,
422        row_group_idx: usize,
423        column_idx: usize,
424    ) -> Result<Option<Sbbf>> {
425        let metadata = self.metadata.row_group(row_group_idx);
426        let column_metadata = metadata.column(column_idx);
427
428        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
429            offset
430                .try_into()
431                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
432        } else {
433            return Ok(None);
434        };
435
436        let buffer = match column_metadata.bloom_filter_length() {
437            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
438            None => self
439                .input
440                .0
441                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
442        }
443        .await?;
444
445        let (header, bitset_offset) =
446            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
447
448        match header.algorithm {
449            BloomFilterAlgorithm::BLOCK(_) => {
450                // this match exists to future proof the singleton algorithm enum
451            }
452        }
453        match header.compression {
454            BloomFilterCompression::UNCOMPRESSED(_) => {
455                // this match exists to future proof the singleton compression enum
456            }
457        }
458        match header.hash {
459            BloomFilterHash::XXHASH(_) => {
460                // this match exists to future proof the singleton hash enum
461            }
462        }
463
464        let bitset = match column_metadata.bloom_filter_length() {
465            Some(_) => buffer.slice(
466                (TryInto::<usize>::try_into(bitset_offset).unwrap()
467                    - TryInto::<usize>::try_into(offset).unwrap())..,
468            ),
469            None => {
470                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
471                    ParquetError::General("Bloom filter length is invalid".to_string())
472                })?;
473                self.input
474                    .0
475                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
476                    .await?
477            }
478        };
479        Ok(Some(Sbbf::new(&bitset)))
480    }
481
482    /// Build a new [`ParquetRecordBatchStream`]
483    ///
484    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
485    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
486        let num_row_groups = self.metadata.row_groups().len();
487
488        let row_groups = match self.row_groups {
489            Some(row_groups) => {
490                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
491                    return Err(general_err!(
492                        "row group {} out of bounds 0..{}",
493                        col,
494                        num_row_groups
495                    ));
496                }
497                row_groups.into()
498            }
499            None => (0..self.metadata.row_groups().len()).collect(),
500        };
501
502        // Try to avoid allocate large buffer
503        let batch_size = self
504            .batch_size
505            .min(self.metadata.file_metadata().num_rows() as usize);
506        let reader_factory = ReaderFactory {
507            input: self.input.0,
508            filter: self.filter,
509            metadata: self.metadata.clone(),
510            fields: self.fields,
511            limit: self.limit,
512            offset: self.offset,
513        };
514
515        // Ensure schema of ParquetRecordBatchStream respects projection, and does
516        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
517        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
518            Some(DataType::Struct(fields)) => {
519                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
520            }
521            None => Fields::empty(),
522            _ => unreachable!("Must be Struct for root type"),
523        };
524        let schema = Arc::new(Schema::new(projected_fields));
525
526        Ok(ParquetRecordBatchStream {
527            metadata: self.metadata,
528            batch_size,
529            row_groups,
530            projection: self.projection,
531            selection: self.selection,
532            schema,
533            reader_factory: Some(reader_factory),
534            state: StreamState::Init,
535        })
536    }
537}
538
539/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
540///
541/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
542/// offset), returns `None` for the reader.
543type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
544
545/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
546/// [`ParquetRecordBatchReader`]
547struct ReaderFactory<T> {
548    metadata: Arc<ParquetMetaData>,
549
550    /// Top level parquet schema
551    fields: Option<Arc<ParquetField>>,
552
553    input: T,
554
555    /// Optional filter
556    filter: Option<RowFilter>,
557
558    /// Limit to apply to remaining row groups.  
559    limit: Option<usize>,
560
561    /// Offset to apply to the next
562    offset: Option<usize>,
563}
564
565impl<T> ReaderFactory<T>
566where
567    T: AsyncFileReader + Send,
568{
569    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
570    ///
571    /// Updates the `limit` and `offset` of the reader factory
572    ///
573    /// Note: this captures self so that the resulting future has a static lifetime
574    async fn read_row_group(
575        mut self,
576        row_group_idx: usize,
577        selection: Option<RowSelection>,
578        projection: ProjectionMask,
579        batch_size: usize,
580    ) -> ReadResult<T> {
581        // TODO: calling build_array multiple times is wasteful
582
583        let meta = self.metadata.row_group(row_group_idx);
584        let offset_index = self
585            .metadata
586            .offset_index()
587            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
588            .filter(|index| !index.is_empty())
589            .map(|x| x[row_group_idx].as_slice());
590
591        let mut row_group = InMemoryRowGroup {
592            // schema: meta.schema_descr_ptr(),
593            row_count: meta.num_rows() as usize,
594            column_chunks: vec![None; meta.columns().len()],
595            offset_index,
596            row_group_idx,
597            metadata: self.metadata.as_ref(),
598        };
599
600        let filter = self.filter.as_mut();
601        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
602
603        // Update selection based on any filters
604        if let Some(filter) = filter {
605            for predicate in filter.predicates.iter_mut() {
606                if !plan_builder.selects_any() {
607                    return Ok((self, None)); // ruled out entire row group
608                }
609
610                // (pre) Fetch only the columns that are selected by the predicate
611                let selection = plan_builder.selection();
612                row_group
613                    .fetch(&mut self.input, predicate.projection(), selection)
614                    .await?;
615
616                let array_reader = ArrayReaderBuilder::new(&row_group)
617                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
618
619                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
620            }
621        }
622
623        // Compute the number of rows in the selection before applying limit and offset
624        let rows_before = plan_builder
625            .num_rows_selected()
626            .unwrap_or(row_group.row_count);
627
628        if rows_before == 0 {
629            return Ok((self, None)); // ruled out entire row group
630        }
631
632        // Apply any limit and offset
633        let plan_builder = plan_builder
634            .limited(row_group.row_count)
635            .with_offset(self.offset)
636            .with_limit(self.limit)
637            .build_limited();
638
639        let rows_after = plan_builder
640            .num_rows_selected()
641            .unwrap_or(row_group.row_count);
642
643        // Update running offset and limit for after the current row group is read
644        if let Some(offset) = &mut self.offset {
645            // Reduction is either because of offset or limit, as limit is applied
646            // after offset has been "exhausted" can just use saturating sub here
647            *offset = offset.saturating_sub(rows_before - rows_after)
648        }
649
650        if rows_after == 0 {
651            return Ok((self, None)); // ruled out entire row group
652        }
653
654        if let Some(limit) = &mut self.limit {
655            *limit -= rows_after;
656        }
657        // fetch the pages needed for decoding
658        row_group
659            .fetch(&mut self.input, &projection, plan_builder.selection())
660            .await?;
661
662        let plan = plan_builder.build();
663
664        let array_reader = ArrayReaderBuilder::new(&row_group)
665            .build_array_reader(self.fields.as_deref(), &projection)?;
666
667        let reader = ParquetRecordBatchReader::new(array_reader, plan);
668
669        Ok((self, Some(reader)))
670    }
671}
672
673enum StreamState<T> {
674    /// At the start of a new row group, or the end of the parquet stream
675    Init,
676    /// Decoding a batch
677    Decoding(ParquetRecordBatchReader),
678    /// Reading data from input
679    Reading(BoxFuture<'static, ReadResult<T>>),
680    /// Error
681    Error,
682}
683
684impl<T> std::fmt::Debug for StreamState<T> {
685    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
686        match self {
687            StreamState::Init => write!(f, "StreamState::Init"),
688            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
689            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
690            StreamState::Error => write!(f, "StreamState::Error"),
691        }
692    }
693}
694
695/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
696///
697/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
698/// allowing users to decode record batches separately from I/O.
699///
700/// # I/O Buffering
701///
702/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
703/// (projection + filtering, etc) and decodes the rows from those buffered pages.
704///
705/// For example, if all rows and columns are selected, the entire row group is
706/// buffered in memory during decode. This minimizes the number of IO operations
707/// required, which is especially important for object stores, where IO operations
708/// have latencies in the hundreds of milliseconds
709///
710///
711/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
712pub struct ParquetRecordBatchStream<T> {
713    metadata: Arc<ParquetMetaData>,
714
715    schema: SchemaRef,
716
717    row_groups: VecDeque<usize>,
718
719    projection: ProjectionMask,
720
721    batch_size: usize,
722
723    selection: Option<RowSelection>,
724
725    /// This is an option so it can be moved into a future
726    reader_factory: Option<ReaderFactory<T>>,
727
728    state: StreamState<T>,
729}
730
731impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
732    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
733        f.debug_struct("ParquetRecordBatchStream")
734            .field("metadata", &self.metadata)
735            .field("schema", &self.schema)
736            .field("batch_size", &self.batch_size)
737            .field("projection", &self.projection)
738            .field("state", &self.state)
739            .finish()
740    }
741}
742
743impl<T> ParquetRecordBatchStream<T> {
744    /// Returns the projected [`SchemaRef`] for reading the parquet file.
745    ///
746    /// Note that the schema metadata will be stripped here. See
747    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
748    pub fn schema(&self) -> &SchemaRef {
749        &self.schema
750    }
751}
752
753impl<T> ParquetRecordBatchStream<T>
754where
755    T: AsyncFileReader + Unpin + Send + 'static,
756{
757    /// Fetches the next row group from the stream.
758    ///
759    /// Users can continue to call this function to get row groups and decode them concurrently.
760    ///
761    /// ## Notes
762    ///
763    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
764    ///
765    /// ## Returns
766    ///
767    /// - `Ok(None)` if the stream has ended.
768    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
769    /// - `Ok(Some(reader))` which holds all the data for the row group.
770    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
771        loop {
772            match &mut self.state {
773                StreamState::Decoding(_) | StreamState::Reading(_) => {
774                    return Err(ParquetError::General(
775                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
776                    ))
777                }
778                StreamState::Init => {
779                    let row_group_idx = match self.row_groups.pop_front() {
780                        Some(idx) => idx,
781                        None => return Ok(None),
782                    };
783
784                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
785
786                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
787
788                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
789
790                    let (reader_factory, maybe_reader) = reader_factory
791                        .read_row_group(
792                            row_group_idx,
793                            selection,
794                            self.projection.clone(),
795                            self.batch_size,
796                        )
797                        .await
798                        .inspect_err(|_| {
799                            self.state = StreamState::Error;
800                        })?;
801                    self.reader_factory = Some(reader_factory);
802
803                    if let Some(reader) = maybe_reader {
804                        return Ok(Some(reader));
805                    } else {
806                        // All rows skipped, read next row group
807                        continue;
808                    }
809                }
810                StreamState::Error => return Ok(None), // Ends the stream as error happens.
811            }
812        }
813    }
814}
815
816impl<T> Stream for ParquetRecordBatchStream<T>
817where
818    T: AsyncFileReader + Unpin + Send + 'static,
819{
820    type Item = Result<RecordBatch>;
821
822    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
823        loop {
824            match &mut self.state {
825                StreamState::Decoding(batch_reader) => match batch_reader.next() {
826                    Some(Ok(batch)) => {
827                        return Poll::Ready(Some(Ok(batch)));
828                    }
829                    Some(Err(e)) => {
830                        self.state = StreamState::Error;
831                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
832                    }
833                    None => self.state = StreamState::Init,
834                },
835                StreamState::Init => {
836                    let row_group_idx = match self.row_groups.pop_front() {
837                        Some(idx) => idx,
838                        None => return Poll::Ready(None),
839                    };
840
841                    let reader = self.reader_factory.take().expect("lost reader factory");
842
843                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
844
845                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
846
847                    let fut = reader
848                        .read_row_group(
849                            row_group_idx,
850                            selection,
851                            self.projection.clone(),
852                            self.batch_size,
853                        )
854                        .boxed();
855
856                    self.state = StreamState::Reading(fut)
857                }
858                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
859                    Ok((reader_factory, maybe_reader)) => {
860                        self.reader_factory = Some(reader_factory);
861                        match maybe_reader {
862                            // Read records from [`ParquetRecordBatchReader`]
863                            Some(reader) => self.state = StreamState::Decoding(reader),
864                            // All rows skipped, read next row group
865                            None => self.state = StreamState::Init,
866                        }
867                    }
868                    Err(e) => {
869                        self.state = StreamState::Error;
870                        return Poll::Ready(Some(Err(e)));
871                    }
872                },
873                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
874            }
875        }
876    }
877}
878
879/// An in-memory collection of column chunks
880struct InMemoryRowGroup<'a> {
881    offset_index: Option<&'a [OffsetIndexMetaData]>,
882    /// Column chunks for this row group
883    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
884    row_count: usize,
885    row_group_idx: usize,
886    metadata: &'a ParquetMetaData,
887}
888
889impl InMemoryRowGroup<'_> {
890    /// Fetches any additional column data specified in `projection` that is not already
891    /// present in `self.column_chunks`.
892    ///
893    /// If `selection` is provided, only the pages required for the selection
894    /// are fetched. Otherwise, all pages are fetched.
895    async fn fetch<T: AsyncFileReader + Send>(
896        &mut self,
897        input: &mut T,
898        projection: &ProjectionMask,
899        selection: Option<&RowSelection>,
900    ) -> Result<()> {
901        let metadata = self.metadata.row_group(self.row_group_idx);
902        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
903            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
904            // `RowSelection`
905            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
906
907            let fetch_ranges = self
908                .column_chunks
909                .iter()
910                .zip(metadata.columns())
911                .enumerate()
912                .filter(|&(idx, (chunk, _chunk_meta))| {
913                    chunk.is_none() && projection.leaf_included(idx)
914                })
915                .flat_map(|(idx, (_chunk, chunk_meta))| {
916                    // If the first page does not start at the beginning of the column,
917                    // then we need to also fetch a dictionary page.
918                    let mut ranges: Vec<Range<u64>> = vec![];
919                    let (start, _len) = chunk_meta.byte_range();
920                    match offset_index[idx].page_locations.first() {
921                        Some(first) if first.offset as u64 != start => {
922                            ranges.push(start..first.offset as u64);
923                        }
924                        _ => (),
925                    }
926
927                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
928                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
929
930                    ranges
931                })
932                .collect();
933
934            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
935            let mut page_start_offsets = page_start_offsets.into_iter();
936
937            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
938                if chunk.is_some() || !projection.leaf_included(idx) {
939                    continue;
940                }
941
942                if let Some(offsets) = page_start_offsets.next() {
943                    let mut chunks = Vec::with_capacity(offsets.len());
944                    for _ in 0..offsets.len() {
945                        chunks.push(chunk_data.next().unwrap());
946                    }
947
948                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
949                        length: metadata.column(idx).byte_range().1 as usize,
950                        data: offsets
951                            .into_iter()
952                            .map(|x| x as usize)
953                            .zip(chunks.into_iter())
954                            .collect(),
955                    }))
956                }
957            }
958        } else {
959            let fetch_ranges = self
960                .column_chunks
961                .iter()
962                .enumerate()
963                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
964                .map(|(idx, _chunk)| {
965                    let column = metadata.column(idx);
966                    let (start, length) = column.byte_range();
967                    start..(start + length)
968                })
969                .collect();
970
971            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
972
973            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
974                if chunk.is_some() || !projection.leaf_included(idx) {
975                    continue;
976                }
977
978                if let Some(data) = chunk_data.next() {
979                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
980                        offset: metadata.column(idx).byte_range().0 as usize,
981                        data,
982                    }));
983                }
984            }
985        }
986
987        Ok(())
988    }
989}
990
991impl RowGroups for InMemoryRowGroup<'_> {
992    fn num_rows(&self) -> usize {
993        self.row_count
994    }
995
996    /// Return chunks for column i
997    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
998        match &self.column_chunks[i] {
999            None => Err(ParquetError::General(format!(
1000                "Invalid column index {i}, column was not fetched"
1001            ))),
1002            Some(data) => {
1003                let page_locations = self
1004                    .offset_index
1005                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1006                    .filter(|index| !index.is_empty())
1007                    .map(|index| index[i].page_locations.clone());
1008                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1009                let page_reader = SerializedPageReader::new(
1010                    data.clone(),
1011                    column_chunk_metadata,
1012                    self.row_count,
1013                    page_locations,
1014                )?;
1015                let page_reader = page_reader.add_crypto_context(
1016                    self.row_group_idx,
1017                    i,
1018                    self.metadata,
1019                    column_chunk_metadata,
1020                )?;
1021
1022                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1023
1024                Ok(Box::new(ColumnChunkIterator {
1025                    reader: Some(Ok(page_reader)),
1026                }))
1027            }
1028        }
1029    }
1030}
1031
1032/// An in-memory column chunk
1033#[derive(Clone)]
1034enum ColumnChunkData {
1035    /// Column chunk data representing only a subset of data pages
1036    Sparse {
1037        /// Length of the full column chunk
1038        length: usize,
1039        /// Subset of data pages included in this sparse chunk.
1040        ///
1041        /// Each element is a tuple of (page offset within file, page data).
1042        /// Each entry is a complete page and the list is ordered by offset.
1043        data: Vec<(usize, Bytes)>,
1044    },
1045    /// Full column chunk and the offset within the original file
1046    Dense { offset: usize, data: Bytes },
1047}
1048
1049impl ColumnChunkData {
1050    /// Return the data for this column chunk at the given offset
1051    fn get(&self, start: u64) -> Result<Bytes> {
1052        match &self {
1053            ColumnChunkData::Sparse { data, .. } => data
1054                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1055                .map(|idx| data[idx].1.clone())
1056                .map_err(|_| {
1057                    ParquetError::General(format!(
1058                        "Invalid offset in sparse column chunk data: {start}"
1059                    ))
1060                }),
1061            ColumnChunkData::Dense { offset, data } => {
1062                let start = start as usize - *offset;
1063                Ok(data.slice(start..))
1064            }
1065        }
1066    }
1067}
1068
1069impl Length for ColumnChunkData {
1070    /// Return the total length of the full column chunk
1071    fn len(&self) -> u64 {
1072        match &self {
1073            ColumnChunkData::Sparse { length, .. } => *length as u64,
1074            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1075        }
1076    }
1077}
1078
1079impl ChunkReader for ColumnChunkData {
1080    type T = bytes::buf::Reader<Bytes>;
1081
1082    fn get_read(&self, start: u64) -> Result<Self::T> {
1083        Ok(self.get(start)?.reader())
1084    }
1085
1086    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1087        Ok(self.get(start)?.slice(..length))
1088    }
1089}
1090
1091/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1092struct ColumnChunkIterator {
1093    reader: Option<Result<Box<dyn PageReader>>>,
1094}
1095
1096impl Iterator for ColumnChunkIterator {
1097    type Item = Result<Box<dyn PageReader>>;
1098
1099    fn next(&mut self) -> Option<Self::Item> {
1100        self.reader.take()
1101    }
1102}
1103
1104impl PageIterator for ColumnChunkIterator {}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::arrow::arrow_reader::{
1110        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1111    };
1112    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1113    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1114    use crate::arrow::ArrowWriter;
1115    use crate::file::metadata::ParquetMetaDataReader;
1116    use crate::file::properties::WriterProperties;
1117    use arrow::compute::kernels::cmp::eq;
1118    use arrow::error::Result as ArrowResult;
1119    use arrow_array::builder::{ListBuilder, StringBuilder};
1120    use arrow_array::cast::AsArray;
1121    use arrow_array::types::Int32Type;
1122    use arrow_array::{
1123        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1124        StructArray, UInt64Array,
1125    };
1126    use arrow_schema::{DataType, Field, Schema};
1127    use futures::{StreamExt, TryStreamExt};
1128    use rand::{rng, Rng};
1129    use std::collections::HashMap;
1130    use std::sync::{Arc, Mutex};
1131    use tempfile::tempfile;
1132
1133    #[derive(Clone)]
1134    struct TestReader {
1135        data: Bytes,
1136        metadata: Option<Arc<ParquetMetaData>>,
1137        requests: Arc<Mutex<Vec<Range<usize>>>>,
1138    }
1139
1140    impl TestReader {
1141        fn new(data: Bytes) -> Self {
1142            Self {
1143                data,
1144                metadata: Default::default(),
1145                requests: Default::default(),
1146            }
1147        }
1148    }
1149
1150    impl AsyncFileReader for TestReader {
1151        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1152            let range = range.clone();
1153            self.requests
1154                .lock()
1155                .unwrap()
1156                .push(range.start as usize..range.end as usize);
1157            futures::future::ready(Ok(self
1158                .data
1159                .slice(range.start as usize..range.end as usize)))
1160            .boxed()
1161        }
1162
1163        fn get_metadata<'a>(
1164            &'a mut self,
1165            options: Option<&'a ArrowReaderOptions>,
1166        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1167            let metadata_reader = ParquetMetaDataReader::new()
1168                .with_page_indexes(options.is_some_and(|o| o.page_index));
1169            self.metadata = Some(Arc::new(
1170                metadata_reader.parse_and_finish(&self.data).unwrap(),
1171            ));
1172            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1173        }
1174    }
1175
1176    #[tokio::test]
1177    async fn test_async_reader() {
1178        let testdata = arrow::util::test_util::parquet_test_data();
1179        let path = format!("{testdata}/alltypes_plain.parquet");
1180        let data = Bytes::from(std::fs::read(path).unwrap());
1181
1182        let async_reader = TestReader::new(data.clone());
1183
1184        let requests = async_reader.requests.clone();
1185        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1186            .await
1187            .unwrap();
1188
1189        let metadata = builder.metadata().clone();
1190        assert_eq!(metadata.num_row_groups(), 1);
1191
1192        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1193        let stream = builder
1194            .with_projection(mask.clone())
1195            .with_batch_size(1024)
1196            .build()
1197            .unwrap();
1198
1199        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1200
1201        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1202            .unwrap()
1203            .with_projection(mask)
1204            .with_batch_size(104)
1205            .build()
1206            .unwrap()
1207            .collect::<ArrowResult<Vec<_>>>()
1208            .unwrap();
1209
1210        assert_eq!(async_batches, sync_batches);
1211
1212        let requests = requests.lock().unwrap();
1213        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1214        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1215
1216        assert_eq!(
1217            &requests[..],
1218            &[
1219                offset_1 as usize..(offset_1 + length_1) as usize,
1220                offset_2 as usize..(offset_2 + length_2) as usize
1221            ]
1222        );
1223    }
1224
1225    #[tokio::test]
1226    async fn test_async_reader_with_next_row_group() {
1227        let testdata = arrow::util::test_util::parquet_test_data();
1228        let path = format!("{testdata}/alltypes_plain.parquet");
1229        let data = Bytes::from(std::fs::read(path).unwrap());
1230
1231        let async_reader = TestReader::new(data.clone());
1232
1233        let requests = async_reader.requests.clone();
1234        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1235            .await
1236            .unwrap();
1237
1238        let metadata = builder.metadata().clone();
1239        assert_eq!(metadata.num_row_groups(), 1);
1240
1241        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1242        let mut stream = builder
1243            .with_projection(mask.clone())
1244            .with_batch_size(1024)
1245            .build()
1246            .unwrap();
1247
1248        let mut readers = vec![];
1249        while let Some(reader) = stream.next_row_group().await.unwrap() {
1250            readers.push(reader);
1251        }
1252
1253        let async_batches: Vec<_> = readers
1254            .into_iter()
1255            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1256            .collect();
1257
1258        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1259            .unwrap()
1260            .with_projection(mask)
1261            .with_batch_size(104)
1262            .build()
1263            .unwrap()
1264            .collect::<ArrowResult<Vec<_>>>()
1265            .unwrap();
1266
1267        assert_eq!(async_batches, sync_batches);
1268
1269        let requests = requests.lock().unwrap();
1270        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1271        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1272
1273        assert_eq!(
1274            &requests[..],
1275            &[
1276                offset_1 as usize..(offset_1 + length_1) as usize,
1277                offset_2 as usize..(offset_2 + length_2) as usize
1278            ]
1279        );
1280    }
1281
1282    #[tokio::test]
1283    async fn test_async_reader_with_index() {
1284        let testdata = arrow::util::test_util::parquet_test_data();
1285        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1286        let data = Bytes::from(std::fs::read(path).unwrap());
1287
1288        let async_reader = TestReader::new(data.clone());
1289
1290        let options = ArrowReaderOptions::new().with_page_index(true);
1291        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1292            .await
1293            .unwrap();
1294
1295        // The builder should have page and offset indexes loaded now
1296        let metadata_with_index = builder.metadata();
1297        assert_eq!(metadata_with_index.num_row_groups(), 1);
1298
1299        // Check offset indexes are present for all columns
1300        let offset_index = metadata_with_index.offset_index().unwrap();
1301        let column_index = metadata_with_index.column_index().unwrap();
1302
1303        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1304        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1305
1306        let num_columns = metadata_with_index
1307            .file_metadata()
1308            .schema_descr()
1309            .num_columns();
1310
1311        // Check page indexes are present for all columns
1312        offset_index
1313            .iter()
1314            .for_each(|x| assert_eq!(x.len(), num_columns));
1315        column_index
1316            .iter()
1317            .for_each(|x| assert_eq!(x.len(), num_columns));
1318
1319        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1320        let stream = builder
1321            .with_projection(mask.clone())
1322            .with_batch_size(1024)
1323            .build()
1324            .unwrap();
1325
1326        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1327
1328        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1329            .unwrap()
1330            .with_projection(mask)
1331            .with_batch_size(1024)
1332            .build()
1333            .unwrap()
1334            .collect::<ArrowResult<Vec<_>>>()
1335            .unwrap();
1336
1337        assert_eq!(async_batches, sync_batches);
1338    }
1339
1340    #[tokio::test]
1341    async fn test_async_reader_with_limit() {
1342        let testdata = arrow::util::test_util::parquet_test_data();
1343        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1344        let data = Bytes::from(std::fs::read(path).unwrap());
1345
1346        let metadata = ParquetMetaDataReader::new()
1347            .parse_and_finish(&data)
1348            .unwrap();
1349        let metadata = Arc::new(metadata);
1350
1351        assert_eq!(metadata.num_row_groups(), 1);
1352
1353        let async_reader = TestReader::new(data.clone());
1354
1355        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1356            .await
1357            .unwrap();
1358
1359        assert_eq!(builder.metadata().num_row_groups(), 1);
1360
1361        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1362        let stream = builder
1363            .with_projection(mask.clone())
1364            .with_batch_size(1024)
1365            .with_limit(1)
1366            .build()
1367            .unwrap();
1368
1369        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1370
1371        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1372            .unwrap()
1373            .with_projection(mask)
1374            .with_batch_size(1024)
1375            .with_limit(1)
1376            .build()
1377            .unwrap()
1378            .collect::<ArrowResult<Vec<_>>>()
1379            .unwrap();
1380
1381        assert_eq!(async_batches, sync_batches);
1382    }
1383
1384    #[tokio::test]
1385    async fn test_async_reader_skip_pages() {
1386        let testdata = arrow::util::test_util::parquet_test_data();
1387        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1388        let data = Bytes::from(std::fs::read(path).unwrap());
1389
1390        let async_reader = TestReader::new(data.clone());
1391
1392        let options = ArrowReaderOptions::new().with_page_index(true);
1393        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1394            .await
1395            .unwrap();
1396
1397        assert_eq!(builder.metadata().num_row_groups(), 1);
1398
1399        let selection = RowSelection::from(vec![
1400            RowSelector::skip(21),   // Skip first page
1401            RowSelector::select(21), // Select page to boundary
1402            RowSelector::skip(41),   // Skip multiple pages
1403            RowSelector::select(41), // Select multiple pages
1404            RowSelector::skip(25),   // Skip page across boundary
1405            RowSelector::select(25), // Select across page boundary
1406            RowSelector::skip(7116), // Skip to final page boundary
1407            RowSelector::select(10), // Select final page
1408        ]);
1409
1410        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1411
1412        let stream = builder
1413            .with_projection(mask.clone())
1414            .with_row_selection(selection.clone())
1415            .build()
1416            .expect("building stream");
1417
1418        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1419
1420        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1421            .unwrap()
1422            .with_projection(mask)
1423            .with_batch_size(1024)
1424            .with_row_selection(selection)
1425            .build()
1426            .unwrap()
1427            .collect::<ArrowResult<Vec<_>>>()
1428            .unwrap();
1429
1430        assert_eq!(async_batches, sync_batches);
1431    }
1432
1433    #[tokio::test]
1434    async fn test_fuzz_async_reader_selection() {
1435        let testdata = arrow::util::test_util::parquet_test_data();
1436        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1437        let data = Bytes::from(std::fs::read(path).unwrap());
1438
1439        let mut rand = rng();
1440
1441        for _ in 0..100 {
1442            let mut expected_rows = 0;
1443            let mut total_rows = 0;
1444            let mut skip = false;
1445            let mut selectors = vec![];
1446
1447            while total_rows < 7300 {
1448                let row_count: usize = rand.random_range(1..100);
1449
1450                let row_count = row_count.min(7300 - total_rows);
1451
1452                selectors.push(RowSelector { row_count, skip });
1453
1454                total_rows += row_count;
1455                if !skip {
1456                    expected_rows += row_count;
1457                }
1458
1459                skip = !skip;
1460            }
1461
1462            let selection = RowSelection::from(selectors);
1463
1464            let async_reader = TestReader::new(data.clone());
1465
1466            let options = ArrowReaderOptions::new().with_page_index(true);
1467            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1468                .await
1469                .unwrap();
1470
1471            assert_eq!(builder.metadata().num_row_groups(), 1);
1472
1473            let col_idx: usize = rand.random_range(0..13);
1474            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1475
1476            let stream = builder
1477                .with_projection(mask.clone())
1478                .with_row_selection(selection.clone())
1479                .build()
1480                .expect("building stream");
1481
1482            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1483
1484            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1485
1486            assert_eq!(actual_rows, expected_rows);
1487        }
1488    }
1489
1490    #[tokio::test]
1491    async fn test_async_reader_zero_row_selector() {
1492        //See https://github.com/apache/arrow-rs/issues/2669
1493        let testdata = arrow::util::test_util::parquet_test_data();
1494        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1495        let data = Bytes::from(std::fs::read(path).unwrap());
1496
1497        let mut rand = rng();
1498
1499        let mut expected_rows = 0;
1500        let mut total_rows = 0;
1501        let mut skip = false;
1502        let mut selectors = vec![];
1503
1504        selectors.push(RowSelector {
1505            row_count: 0,
1506            skip: false,
1507        });
1508
1509        while total_rows < 7300 {
1510            let row_count: usize = rand.random_range(1..100);
1511
1512            let row_count = row_count.min(7300 - total_rows);
1513
1514            selectors.push(RowSelector { row_count, skip });
1515
1516            total_rows += row_count;
1517            if !skip {
1518                expected_rows += row_count;
1519            }
1520
1521            skip = !skip;
1522        }
1523
1524        let selection = RowSelection::from(selectors);
1525
1526        let async_reader = TestReader::new(data.clone());
1527
1528        let options = ArrowReaderOptions::new().with_page_index(true);
1529        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1530            .await
1531            .unwrap();
1532
1533        assert_eq!(builder.metadata().num_row_groups(), 1);
1534
1535        let col_idx: usize = rand.random_range(0..13);
1536        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1537
1538        let stream = builder
1539            .with_projection(mask.clone())
1540            .with_row_selection(selection.clone())
1541            .build()
1542            .expect("building stream");
1543
1544        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1545
1546        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1547
1548        assert_eq!(actual_rows, expected_rows);
1549    }
1550
1551    #[tokio::test]
1552    async fn test_row_filter() {
1553        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1554        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1555        let data = RecordBatch::try_from_iter([
1556            ("a", Arc::new(a) as ArrayRef),
1557            ("b", Arc::new(b) as ArrayRef),
1558        ])
1559        .unwrap();
1560
1561        let mut buf = Vec::with_capacity(1024);
1562        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1563        writer.write(&data).unwrap();
1564        writer.close().unwrap();
1565
1566        let data: Bytes = buf.into();
1567        let metadata = ParquetMetaDataReader::new()
1568            .parse_and_finish(&data)
1569            .unwrap();
1570        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1571
1572        let test = TestReader::new(data);
1573        let requests = test.requests.clone();
1574
1575        let a_scalar = StringArray::from_iter_values(["b"]);
1576        let a_filter = ArrowPredicateFn::new(
1577            ProjectionMask::leaves(&parquet_schema, vec![0]),
1578            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1579        );
1580
1581        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1582
1583        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1584        let stream = ParquetRecordBatchStreamBuilder::new(test)
1585            .await
1586            .unwrap()
1587            .with_projection(mask.clone())
1588            .with_batch_size(1024)
1589            .with_row_filter(filter)
1590            .build()
1591            .unwrap();
1592
1593        let batches: Vec<_> = stream.try_collect().await.unwrap();
1594        assert_eq!(batches.len(), 1);
1595
1596        let batch = &batches[0];
1597        assert_eq!(batch.num_columns(), 2);
1598
1599        // Filter should have kept only rows with "b" in column 0
1600        assert_eq!(
1601            batch.column(0).as_ref(),
1602            &StringArray::from_iter_values(["b", "b", "b"])
1603        );
1604        assert_eq!(
1605            batch.column(1).as_ref(),
1606            &StringArray::from_iter_values(["2", "3", "4"])
1607        );
1608
1609        // Should only have made 2 requests:
1610        // * First request fetches data for evaluating the predicate
1611        // * Second request fetches data for evaluating the projection
1612        assert_eq!(requests.lock().unwrap().len(), 2);
1613    }
1614
1615    #[tokio::test]
1616    async fn test_two_row_filters() {
1617        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1618        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1619        let c = Int32Array::from_iter(0..6);
1620        let data = RecordBatch::try_from_iter([
1621            ("a", Arc::new(a) as ArrayRef),
1622            ("b", Arc::new(b) as ArrayRef),
1623            ("c", Arc::new(c) as ArrayRef),
1624        ])
1625        .unwrap();
1626
1627        let mut buf = Vec::with_capacity(1024);
1628        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1629        writer.write(&data).unwrap();
1630        writer.close().unwrap();
1631
1632        let data: Bytes = buf.into();
1633        let metadata = ParquetMetaDataReader::new()
1634            .parse_and_finish(&data)
1635            .unwrap();
1636        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1637
1638        let test = TestReader::new(data);
1639        let requests = test.requests.clone();
1640
1641        let a_scalar = StringArray::from_iter_values(["b"]);
1642        let a_filter = ArrowPredicateFn::new(
1643            ProjectionMask::leaves(&parquet_schema, vec![0]),
1644            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1645        );
1646
1647        let b_scalar = StringArray::from_iter_values(["4"]);
1648        let b_filter = ArrowPredicateFn::new(
1649            ProjectionMask::leaves(&parquet_schema, vec![1]),
1650            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1651        );
1652
1653        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1654
1655        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1656        let stream = ParquetRecordBatchStreamBuilder::new(test)
1657            .await
1658            .unwrap()
1659            .with_projection(mask.clone())
1660            .with_batch_size(1024)
1661            .with_row_filter(filter)
1662            .build()
1663            .unwrap();
1664
1665        let batches: Vec<_> = stream.try_collect().await.unwrap();
1666        assert_eq!(batches.len(), 1);
1667
1668        let batch = &batches[0];
1669        assert_eq!(batch.num_rows(), 1);
1670        assert_eq!(batch.num_columns(), 2);
1671
1672        let col = batch.column(0);
1673        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1674        assert_eq!(val, "b");
1675
1676        let col = batch.column(1);
1677        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1678        assert_eq!(val, 3);
1679
1680        // Should only have made 3 requests
1681        // * First request fetches data for evaluating the first predicate
1682        // * Second request fetches data for evaluating the second predicate
1683        // * Third request fetches data for evaluating the projection
1684        assert_eq!(requests.lock().unwrap().len(), 3);
1685    }
1686
1687    #[tokio::test]
1688    async fn test_limit_multiple_row_groups() {
1689        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1690        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1691        let c = Int32Array::from_iter(0..6);
1692        let data = RecordBatch::try_from_iter([
1693            ("a", Arc::new(a) as ArrayRef),
1694            ("b", Arc::new(b) as ArrayRef),
1695            ("c", Arc::new(c) as ArrayRef),
1696        ])
1697        .unwrap();
1698
1699        let mut buf = Vec::with_capacity(1024);
1700        let props = WriterProperties::builder()
1701            .set_max_row_group_size(3)
1702            .build();
1703        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1704        writer.write(&data).unwrap();
1705        writer.close().unwrap();
1706
1707        let data: Bytes = buf.into();
1708        let metadata = ParquetMetaDataReader::new()
1709            .parse_and_finish(&data)
1710            .unwrap();
1711
1712        assert_eq!(metadata.num_row_groups(), 2);
1713
1714        let test = TestReader::new(data);
1715
1716        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1717            .await
1718            .unwrap()
1719            .with_batch_size(1024)
1720            .with_limit(4)
1721            .build()
1722            .unwrap();
1723
1724        let batches: Vec<_> = stream.try_collect().await.unwrap();
1725        // Expect one batch for each row group
1726        assert_eq!(batches.len(), 2);
1727
1728        let batch = &batches[0];
1729        // First batch should contain all rows
1730        assert_eq!(batch.num_rows(), 3);
1731        assert_eq!(batch.num_columns(), 3);
1732        let col2 = batch.column(2).as_primitive::<Int32Type>();
1733        assert_eq!(col2.values(), &[0, 1, 2]);
1734
1735        let batch = &batches[1];
1736        // Second batch should trigger the limit and only have one row
1737        assert_eq!(batch.num_rows(), 1);
1738        assert_eq!(batch.num_columns(), 3);
1739        let col2 = batch.column(2).as_primitive::<Int32Type>();
1740        assert_eq!(col2.values(), &[3]);
1741
1742        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1743            .await
1744            .unwrap()
1745            .with_offset(2)
1746            .with_limit(3)
1747            .build()
1748            .unwrap();
1749
1750        let batches: Vec<_> = stream.try_collect().await.unwrap();
1751        // Expect one batch for each row group
1752        assert_eq!(batches.len(), 2);
1753
1754        let batch = &batches[0];
1755        // First batch should contain one row
1756        assert_eq!(batch.num_rows(), 1);
1757        assert_eq!(batch.num_columns(), 3);
1758        let col2 = batch.column(2).as_primitive::<Int32Type>();
1759        assert_eq!(col2.values(), &[2]);
1760
1761        let batch = &batches[1];
1762        // Second batch should contain two rows
1763        assert_eq!(batch.num_rows(), 2);
1764        assert_eq!(batch.num_columns(), 3);
1765        let col2 = batch.column(2).as_primitive::<Int32Type>();
1766        assert_eq!(col2.values(), &[3, 4]);
1767
1768        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1769            .await
1770            .unwrap()
1771            .with_offset(4)
1772            .with_limit(20)
1773            .build()
1774            .unwrap();
1775
1776        let batches: Vec<_> = stream.try_collect().await.unwrap();
1777        // Should skip first row group
1778        assert_eq!(batches.len(), 1);
1779
1780        let batch = &batches[0];
1781        // First batch should contain two rows
1782        assert_eq!(batch.num_rows(), 2);
1783        assert_eq!(batch.num_columns(), 3);
1784        let col2 = batch.column(2).as_primitive::<Int32Type>();
1785        assert_eq!(col2.values(), &[4, 5]);
1786    }
1787
1788    #[tokio::test]
1789    async fn test_row_filter_with_index() {
1790        let testdata = arrow::util::test_util::parquet_test_data();
1791        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1792        let data = Bytes::from(std::fs::read(path).unwrap());
1793
1794        let metadata = ParquetMetaDataReader::new()
1795            .parse_and_finish(&data)
1796            .unwrap();
1797        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1798
1799        assert_eq!(metadata.num_row_groups(), 1);
1800
1801        let async_reader = TestReader::new(data.clone());
1802
1803        let a_filter =
1804            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1805                Ok(batch.column(0).as_boolean().clone())
1806            });
1807
1808        let b_scalar = Int8Array::from(vec![2]);
1809        let b_filter = ArrowPredicateFn::new(
1810            ProjectionMask::leaves(&parquet_schema, vec![2]),
1811            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1812        );
1813
1814        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1815
1816        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1817
1818        let options = ArrowReaderOptions::new().with_page_index(true);
1819        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1820            .await
1821            .unwrap()
1822            .with_projection(mask.clone())
1823            .with_batch_size(1024)
1824            .with_row_filter(filter)
1825            .build()
1826            .unwrap();
1827
1828        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1829
1830        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1831
1832        assert_eq!(total_rows, 730);
1833    }
1834
1835    #[tokio::test]
1836    async fn test_in_memory_row_group_sparse() {
1837        let testdata = arrow::util::test_util::parquet_test_data();
1838        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1839        let data = Bytes::from(std::fs::read(path).unwrap());
1840
1841        let metadata = ParquetMetaDataReader::new()
1842            .with_page_indexes(true)
1843            .parse_and_finish(&data)
1844            .unwrap();
1845
1846        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1847
1848        let mut metadata_builder = metadata.into_builder();
1849        let mut row_groups = metadata_builder.take_row_groups();
1850        row_groups.truncate(1);
1851        let row_group_meta = row_groups.pop().unwrap();
1852
1853        let metadata = metadata_builder
1854            .add_row_group(row_group_meta)
1855            .set_column_index(None)
1856            .set_offset_index(Some(vec![offset_index.clone()]))
1857            .build();
1858
1859        let metadata = Arc::new(metadata);
1860
1861        let num_rows = metadata.row_group(0).num_rows();
1862
1863        assert_eq!(metadata.num_row_groups(), 1);
1864
1865        let async_reader = TestReader::new(data.clone());
1866
1867        let requests = async_reader.requests.clone();
1868        let (_, fields) = parquet_to_arrow_schema_and_fields(
1869            metadata.file_metadata().schema_descr(),
1870            ProjectionMask::all(),
1871            None,
1872        )
1873        .unwrap();
1874
1875        let _schema_desc = metadata.file_metadata().schema_descr();
1876
1877        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1878
1879        let reader_factory = ReaderFactory {
1880            metadata,
1881            fields: fields.map(Arc::new),
1882            input: async_reader,
1883            filter: None,
1884            limit: None,
1885            offset: None,
1886        };
1887
1888        let mut skip = true;
1889        let mut pages = offset_index[0].page_locations.iter().peekable();
1890
1891        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1892        let mut selectors = vec![];
1893        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1894        while let Some(page) = pages.next() {
1895            let num_rows = if let Some(next_page) = pages.peek() {
1896                next_page.first_row_index - page.first_row_index
1897            } else {
1898                num_rows - page.first_row_index
1899            };
1900
1901            if skip {
1902                selectors.push(RowSelector::skip(num_rows as usize));
1903            } else {
1904                selectors.push(RowSelector::select(num_rows as usize));
1905                let start = page.offset as usize;
1906                let end = start + page.compressed_page_size as usize;
1907                expected_page_requests.push(start..end);
1908            }
1909            skip = !skip;
1910        }
1911
1912        let selection = RowSelection::from(selectors);
1913
1914        let (_factory, _reader) = reader_factory
1915            .read_row_group(0, Some(selection), projection.clone(), 48)
1916            .await
1917            .expect("reading row group");
1918
1919        let requests = requests.lock().unwrap();
1920
1921        assert_eq!(&requests[..], &expected_page_requests)
1922    }
1923
1924    #[tokio::test]
1925    async fn test_batch_size_overallocate() {
1926        let testdata = arrow::util::test_util::parquet_test_data();
1927        // `alltypes_plain.parquet` only have 8 rows
1928        let path = format!("{testdata}/alltypes_plain.parquet");
1929        let data = Bytes::from(std::fs::read(path).unwrap());
1930
1931        let async_reader = TestReader::new(data.clone());
1932
1933        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1934            .await
1935            .unwrap();
1936
1937        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1938
1939        let stream = builder
1940            .with_projection(ProjectionMask::all())
1941            .with_batch_size(1024)
1942            .build()
1943            .unwrap();
1944        assert_ne!(1024, file_rows);
1945        assert_eq!(stream.batch_size, file_rows);
1946    }
1947
1948    #[tokio::test]
1949    async fn test_get_row_group_column_bloom_filter_without_length() {
1950        let testdata = arrow::util::test_util::parquet_test_data();
1951        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1952        let data = Bytes::from(std::fs::read(path).unwrap());
1953        test_get_row_group_column_bloom_filter(data, false).await;
1954    }
1955
1956    #[tokio::test]
1957    async fn test_parquet_record_batch_stream_schema() {
1958        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1959            schema.flattened_fields().iter().map(|f| f.name()).collect()
1960        }
1961
1962        // ParquetRecordBatchReaderBuilder::schema differs from
1963        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1964        // schema contents (in terms of custom metadata attached to schema, and fields
1965        // returned). Test to ensure this remains consistent behaviour.
1966        //
1967        // Ensure same for asynchronous versions of the above.
1968
1969        // Prep data, for a schema with nested fields, with custom metadata
1970        let mut metadata = HashMap::with_capacity(1);
1971        metadata.insert("key".to_string(), "value".to_string());
1972
1973        let nested_struct_array = StructArray::from(vec![
1974            (
1975                Arc::new(Field::new("d", DataType::Utf8, true)),
1976                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1977            ),
1978            (
1979                Arc::new(Field::new("e", DataType::Utf8, true)),
1980                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1981            ),
1982        ]);
1983        let struct_array = StructArray::from(vec![
1984            (
1985                Arc::new(Field::new("a", DataType::Int32, true)),
1986                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1987            ),
1988            (
1989                Arc::new(Field::new("b", DataType::UInt64, true)),
1990                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1991            ),
1992            (
1993                Arc::new(Field::new(
1994                    "c",
1995                    nested_struct_array.data_type().clone(),
1996                    true,
1997                )),
1998                Arc::new(nested_struct_array) as ArrayRef,
1999            ),
2000        ]);
2001
2002        let schema =
2003            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2004        let record_batch = RecordBatch::from(struct_array)
2005            .with_schema(schema.clone())
2006            .unwrap();
2007
2008        // Write parquet with custom metadata in schema
2009        let mut file = tempfile().unwrap();
2010        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2011        writer.write(&record_batch).unwrap();
2012        writer.close().unwrap();
2013
2014        let all_fields = ["a", "b", "c", "d", "e"];
2015        // (leaf indices in mask, expected names in output schema all fields)
2016        let projections = [
2017            (vec![], vec![]),
2018            (vec![0], vec!["a"]),
2019            (vec![0, 1], vec!["a", "b"]),
2020            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2021            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2022        ];
2023
2024        // Ensure we're consistent for each of these projections
2025        for (indices, expected_projected_names) in projections {
2026            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2027                // Builder schema should preserve all fields and metadata
2028                assert_eq!(get_all_field_names(&builder), all_fields);
2029                assert_eq!(builder.metadata, metadata);
2030                // Reader & batch schema should show only projected fields, and no metadata
2031                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2032                assert_eq!(reader.metadata, HashMap::default());
2033                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2034                assert_eq!(batch.metadata, HashMap::default());
2035            };
2036
2037            let builder =
2038                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2039            let sync_builder_schema = builder.schema().clone();
2040            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2041            let mut reader = builder.with_projection(mask).build().unwrap();
2042            let sync_reader_schema = reader.schema();
2043            let batch = reader.next().unwrap().unwrap();
2044            let sync_batch_schema = batch.schema();
2045            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2046
2047            // asynchronous should be same
2048            let file = tokio::fs::File::from(file.try_clone().unwrap());
2049            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2050            let async_builder_schema = builder.schema().clone();
2051            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2052            let mut reader = builder.with_projection(mask).build().unwrap();
2053            let async_reader_schema = reader.schema().clone();
2054            let batch = reader.next().await.unwrap().unwrap();
2055            let async_batch_schema = batch.schema();
2056            assert_schemas(
2057                async_builder_schema,
2058                async_reader_schema,
2059                async_batch_schema,
2060            );
2061        }
2062    }
2063
2064    #[tokio::test]
2065    async fn test_get_row_group_column_bloom_filter_with_length() {
2066        // convert to new parquet file with bloom_filter_length
2067        let testdata = arrow::util::test_util::parquet_test_data();
2068        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2069        let data = Bytes::from(std::fs::read(path).unwrap());
2070        let async_reader = TestReader::new(data.clone());
2071        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2072            .await
2073            .unwrap();
2074        let schema = builder.schema().clone();
2075        let stream = builder.build().unwrap();
2076        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2077
2078        let mut parquet_data = Vec::new();
2079        let props = WriterProperties::builder()
2080            .set_bloom_filter_enabled(true)
2081            .build();
2082        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2083        for batch in batches {
2084            writer.write(&batch).unwrap();
2085        }
2086        writer.close().unwrap();
2087
2088        // test the new parquet file
2089        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2090    }
2091
2092    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2093        let async_reader = TestReader::new(data.clone());
2094
2095        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2096            .await
2097            .unwrap();
2098
2099        let metadata = builder.metadata();
2100        assert_eq!(metadata.num_row_groups(), 1);
2101        let row_group = metadata.row_group(0);
2102        let column = row_group.column(0);
2103        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2104
2105        let sbbf = builder
2106            .get_row_group_column_bloom_filter(0, 0)
2107            .await
2108            .unwrap()
2109            .unwrap();
2110        assert!(sbbf.check(&"Hello"));
2111        assert!(!sbbf.check(&"Hello_Not_Exists"));
2112    }
2113
2114    #[tokio::test]
2115    async fn test_nested_skip() {
2116        let schema = Arc::new(Schema::new(vec![
2117            Field::new("col_1", DataType::UInt64, false),
2118            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2119        ]));
2120
2121        // Default writer properties
2122        let props = WriterProperties::builder()
2123            .set_data_page_row_count_limit(256)
2124            .set_write_batch_size(256)
2125            .set_max_row_group_size(1024);
2126
2127        // Write data
2128        let mut file = tempfile().unwrap();
2129        let mut writer =
2130            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2131
2132        let mut builder = ListBuilder::new(StringBuilder::new());
2133        for id in 0..1024 {
2134            match id % 3 {
2135                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2136                1 => builder.append_value([Some(format!("id_{id}"))]),
2137                _ => builder.append_null(),
2138            }
2139        }
2140        let refs = vec![
2141            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2142            Arc::new(builder.finish()) as ArrayRef,
2143        ];
2144
2145        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2146        writer.write(&batch).unwrap();
2147        writer.close().unwrap();
2148
2149        let selections = [
2150            RowSelection::from(vec![
2151                RowSelector::skip(313),
2152                RowSelector::select(1),
2153                RowSelector::skip(709),
2154                RowSelector::select(1),
2155            ]),
2156            RowSelection::from(vec![
2157                RowSelector::skip(255),
2158                RowSelector::select(1),
2159                RowSelector::skip(767),
2160                RowSelector::select(1),
2161            ]),
2162            RowSelection::from(vec![
2163                RowSelector::select(255),
2164                RowSelector::skip(1),
2165                RowSelector::select(767),
2166                RowSelector::skip(1),
2167            ]),
2168            RowSelection::from(vec![
2169                RowSelector::skip(254),
2170                RowSelector::select(1),
2171                RowSelector::select(1),
2172                RowSelector::skip(767),
2173                RowSelector::select(1),
2174            ]),
2175        ];
2176
2177        for selection in selections {
2178            let expected = selection.row_count();
2179            // Read data
2180            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2181                tokio::fs::File::from_std(file.try_clone().unwrap()),
2182                ArrowReaderOptions::new().with_page_index(true),
2183            )
2184            .await
2185            .unwrap();
2186
2187            reader = reader.with_row_selection(selection);
2188
2189            let mut stream = reader.build().unwrap();
2190
2191            let mut total_rows = 0;
2192            while let Some(rb) = stream.next().await {
2193                let rb = rb.unwrap();
2194                total_rows += rb.num_rows();
2195            }
2196            assert_eq!(total_rows, expected);
2197        }
2198    }
2199
2200    #[tokio::test]
2201    async fn test_row_filter_nested() {
2202        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2203        let b = StructArray::from(vec![
2204            (
2205                Arc::new(Field::new("aa", DataType::Utf8, true)),
2206                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2207            ),
2208            (
2209                Arc::new(Field::new("bb", DataType::Utf8, true)),
2210                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2211            ),
2212        ]);
2213        let c = Int32Array::from_iter(0..6);
2214        let data = RecordBatch::try_from_iter([
2215            ("a", Arc::new(a) as ArrayRef),
2216            ("b", Arc::new(b) as ArrayRef),
2217            ("c", Arc::new(c) as ArrayRef),
2218        ])
2219        .unwrap();
2220
2221        let mut buf = Vec::with_capacity(1024);
2222        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2223        writer.write(&data).unwrap();
2224        writer.close().unwrap();
2225
2226        let data: Bytes = buf.into();
2227        let metadata = ParquetMetaDataReader::new()
2228            .parse_and_finish(&data)
2229            .unwrap();
2230        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2231
2232        let test = TestReader::new(data);
2233        let requests = test.requests.clone();
2234
2235        let a_scalar = StringArray::from_iter_values(["b"]);
2236        let a_filter = ArrowPredicateFn::new(
2237            ProjectionMask::leaves(&parquet_schema, vec![0]),
2238            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2239        );
2240
2241        let b_scalar = StringArray::from_iter_values(["4"]);
2242        let b_filter = ArrowPredicateFn::new(
2243            ProjectionMask::leaves(&parquet_schema, vec![2]),
2244            move |batch| {
2245                // Filter on the second element of the struct.
2246                let struct_array = batch
2247                    .column(0)
2248                    .as_any()
2249                    .downcast_ref::<StructArray>()
2250                    .unwrap();
2251                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2252            },
2253        );
2254
2255        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2256
2257        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2258        let stream = ParquetRecordBatchStreamBuilder::new(test)
2259            .await
2260            .unwrap()
2261            .with_projection(mask.clone())
2262            .with_batch_size(1024)
2263            .with_row_filter(filter)
2264            .build()
2265            .unwrap();
2266
2267        let batches: Vec<_> = stream.try_collect().await.unwrap();
2268        assert_eq!(batches.len(), 1);
2269
2270        let batch = &batches[0];
2271        assert_eq!(batch.num_rows(), 1);
2272        assert_eq!(batch.num_columns(), 2);
2273
2274        let col = batch.column(0);
2275        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2276        assert_eq!(val, "b");
2277
2278        let col = batch.column(1);
2279        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2280        assert_eq!(val, 3);
2281
2282        // Should only have made 3 requests
2283        // * First request fetches data for evaluating the first predicate
2284        // * Second request fetches data for evaluating the second predicate
2285        // * Third request fetches data for evaluating the projection
2286        assert_eq!(requests.lock().unwrap().len(), 3);
2287    }
2288
2289    #[tokio::test]
2290    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2291        use tokio::fs::File;
2292        let testdata = arrow::util::test_util::parquet_test_data();
2293        let path = format!("{testdata}/alltypes_plain.parquet");
2294        let mut file = File::open(&path).await.unwrap();
2295        let file_size = file.metadata().await.unwrap().len();
2296        let mut metadata = ParquetMetaDataReader::new()
2297            .with_page_indexes(true)
2298            .load_and_finish(&mut file, file_size)
2299            .await
2300            .unwrap();
2301
2302        metadata.set_offset_index(Some(vec![]));
2303        let options = ArrowReaderOptions::new().with_page_index(true);
2304        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2305        let reader =
2306            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2307                .build()
2308                .unwrap();
2309
2310        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2311        assert_eq!(result.len(), 1);
2312    }
2313
2314    #[tokio::test]
2315    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2316        use tokio::fs::File;
2317        let testdata = arrow::util::test_util::parquet_test_data();
2318        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2319        let mut file = File::open(&path).await.unwrap();
2320        let file_size = file.metadata().await.unwrap().len();
2321        let metadata = ParquetMetaDataReader::new()
2322            .with_page_indexes(true)
2323            .load_and_finish(&mut file, file_size)
2324            .await
2325            .unwrap();
2326
2327        let options = ArrowReaderOptions::new().with_page_index(true);
2328        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2329        let reader =
2330            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2331                .build()
2332                .unwrap();
2333
2334        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2335        assert_eq!(result.len(), 8);
2336    }
2337
2338    #[tokio::test]
2339    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2340        use tempfile::TempDir;
2341        use tokio::fs::File;
2342        fn write_metadata_to_local_file(
2343            metadata: ParquetMetaData,
2344            file: impl AsRef<std::path::Path>,
2345        ) {
2346            use crate::file::metadata::ParquetMetaDataWriter;
2347            use std::fs::File;
2348            let file = File::create(file).unwrap();
2349            ParquetMetaDataWriter::new(file, &metadata)
2350                .finish()
2351                .unwrap()
2352        }
2353
2354        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2355            use std::fs::File;
2356            let file = File::open(file).unwrap();
2357            ParquetMetaDataReader::new()
2358                .with_page_indexes(true)
2359                .parse_and_finish(&file)
2360                .unwrap()
2361        }
2362
2363        let testdata = arrow::util::test_util::parquet_test_data();
2364        let path = format!("{testdata}/alltypes_plain.parquet");
2365        let mut file = File::open(&path).await.unwrap();
2366        let file_size = file.metadata().await.unwrap().len();
2367        let metadata = ParquetMetaDataReader::new()
2368            .with_page_indexes(true)
2369            .load_and_finish(&mut file, file_size)
2370            .await
2371            .unwrap();
2372
2373        let tempdir = TempDir::new().unwrap();
2374        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2375        write_metadata_to_local_file(metadata, &metadata_path);
2376        let metadata = read_metadata_from_local_file(&metadata_path);
2377
2378        let options = ArrowReaderOptions::new().with_page_index(true);
2379        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2380        let reader =
2381            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2382                .build()
2383                .unwrap();
2384
2385        // Panics here
2386        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2387        assert_eq!(result.len(), 1);
2388    }
2389}