parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42    ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46    RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
51use crate::bloom_filter::{
52    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
53};
54use crate::column::page::{PageIterator, PageReader};
55use crate::errors::{ParquetError, Result};
56use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
57use crate::file::page_index::offset_index::OffsetIndexMetaData;
58use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
73///
74/// Notes:
75///
76/// 1. There is a default implementation for types that implement [`AsyncRead`]
77///    and [`AsyncSeek`], for example [`tokio::fs::File`].
78///
79/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
80///    is enabled, implements this interface for [`ObjectStore`].
81///
82/// [`ObjectStore`]: object_store::ObjectStore
83///
84/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
85pub trait AsyncFileReader: Send {
86    /// Retrieve the bytes in `range`
87    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
90    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91        async move {
92            let mut result = Vec::with_capacity(ranges.len());
93
94            for range in ranges.into_iter() {
95                let data = self.get_bytes(range).await?;
96                result.push(data);
97            }
98
99            Ok(result)
100        }
101        .boxed()
102    }
103
104    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
105    ///
106    /// This is an asynchronous operation as it may involve reading the file
107    /// footer and potentially other metadata from disk or a remote source.
108    ///
109    /// Reading data from Parquet requires the metadata to understand the
110    /// schema, row groups, and location of pages within the file. This metadata
111    /// is stored primarily in the footer of the Parquet file, and can be read using
112    /// [`ParquetMetaDataReader`].
113    ///
114    /// However, implementations can significantly speed up reading Parquet by
115    /// supplying cached metadata or pre-fetched metadata via this API.
116    ///
117    /// # Parameters
118    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
119    ///   and other options that affect how the metadata is read.
120    fn get_metadata<'a>(
121        &'a mut self,
122        options: Option<&'a ArrowReaderOptions>,
123    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
127impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129        self.as_mut().get_bytes(range)
130    }
131
132    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133        self.as_mut().get_byte_ranges(ranges)
134    }
135
136    fn get_metadata<'a>(
137        &'a mut self,
138        options: Option<&'a ArrowReaderOptions>,
139    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140        self.as_mut().get_metadata(options)
141    }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146        async move {
147            self.seek(SeekFrom::End(-(suffix as i64))).await?;
148            let mut buf = Vec::with_capacity(suffix);
149            self.take(suffix as _).read_to_end(&mut buf).await?;
150            Ok(buf.into())
151        }
152        .boxed()
153    }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158        async move {
159            self.seek(SeekFrom::Start(range.start)).await?;
160
161            let to_read = range.end - range.start;
162            let mut buffer = Vec::with_capacity(to_read.try_into()?);
163            let read = self.take(to_read).read_to_end(&mut buffer).await?;
164            if read as u64 != to_read {
165                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166            }
167
168            Ok(buffer.into())
169        }
170        .boxed()
171    }
172
173    fn get_metadata<'a>(
174        &'a mut self,
175        options: Option<&'a ArrowReaderOptions>,
176    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177        async move {
178            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180            );
181
182            #[cfg(feature = "encryption")]
183            let metadata_reader = metadata_reader.with_decryption_properties(
184                options.and_then(|o| o.file_decryption_properties.as_ref()),
185            );
186
187            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188            Ok(Arc::new(parquet_metadata))
189        }
190        .boxed()
191    }
192}
193
194impl ArrowReaderMetadata {
195    /// Returns a new [`ArrowReaderMetadata`] for this builder
196    ///
197    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
198    pub async fn load_async<T: AsyncFileReader>(
199        input: &mut T,
200        options: ArrowReaderOptions,
201    ) -> Result<Self> {
202        let metadata = input.get_metadata(Some(&options)).await?;
203        Self::try_new(metadata, options)
204    }
205}
206
207#[doc(hidden)]
208// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
209//
210// Allows sharing the same builder for both the sync and async versions, whilst also not
211// breaking the pre-existing ParquetRecordBatchStreamBuilder API
212pub struct AsyncReader<T>(T);
213
214/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
215///
216/// This can be used to decode a Parquet file in streaming fashion (without
217/// downloading the whole file at once) from a remote source, such as an object store.
218///
219/// This builder handles reading the parquet file metadata, allowing consumers
220/// to use this information to select what specific columns, row groups, etc.
221/// they wish to be read by the resulting stream.
222///
223/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
224///
225/// See [`ArrowReaderBuilder`] for additional member functions
226pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
230    /// specified source.
231    ///
232    /// # Example
233    /// ```
234    /// # #[tokio::main(flavor="current_thread")]
235    /// # async fn main() {
236    /// #
237    /// # use arrow_array::RecordBatch;
238    /// # use arrow::util::pretty::pretty_format_batches;
239    /// # use futures::TryStreamExt;
240    /// #
241    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
242    /// #
243    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
244    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
245    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
246    /// #     assert_eq!(
247    /// #          &actual_lines, expected_lines,
248    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
249    /// #          expected_lines, actual_lines
250    /// #      );
251    /// #  }
252    /// #
253    /// # let testdata = arrow::util::test_util::parquet_test_data();
254    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
255    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
256    /// // another async I/O reader such as a reader from an object store.
257    /// let file = tokio::fs::File::open(path).await.unwrap();
258    ///
259    /// // Configure options for reading from the async source
260    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
261    ///     .await
262    ///     .unwrap();
263    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
264    /// // a stream that can be used to incrementally read the data in batches
265    /// let stream = builder.build().unwrap();
266    /// // In this example, we collect the stream into a Vec<RecordBatch>
267    /// // but real applications would likely process the batches as they are read
268    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
269    /// // Demonstrate the results are as expected
270    /// assert_batches_eq(
271    ///     &results,
272    ///     &[
273    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
274    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
275    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
276    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
277    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
278    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
279    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
280    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
281    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
282    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
283    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
284    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
285    ///      ],
286    ///  );
287    /// # }
288    /// ```
289    ///
290    /// # Example configuring options and reading metadata
291    ///
292    /// There are many options that control the behavior of the reader, such as
293    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
294    ///
295    /// ```
296    /// # #[tokio::main(flavor="current_thread")]
297    /// # async fn main() {
298    /// #
299    /// # use arrow_array::RecordBatch;
300    /// # use arrow::util::pretty::pretty_format_batches;
301    /// # use futures::TryStreamExt;
302    /// #
303    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
304    /// #
305    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
306    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
307    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
308    /// #     assert_eq!(
309    /// #          &actual_lines, expected_lines,
310    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
311    /// #          expected_lines, actual_lines
312    /// #      );
313    /// #  }
314    /// #
315    /// # let testdata = arrow::util::test_util::parquet_test_data();
316    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
317    /// // As before, use tokio::fs::File to read data using an async I/O.
318    /// let file = tokio::fs::File::open(path).await.unwrap();
319    ///
320    /// // Configure options for reading from the async source, in this case we set the batch size
321    /// // to 3 which produces 3 rows at a time.
322    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
323    ///     .await
324    ///     .unwrap()
325    ///     .with_batch_size(3);
326    ///
327    /// // We can also read the metadata to inspect the schema and other metadata
328    /// // before actually reading the data
329    /// let file_metadata = builder.metadata().file_metadata();
330    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
331    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
332    ///
333    /// let stream = builder.with_projection(mask).build().unwrap();
334    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
335    /// // Print out the results
336    /// assert_batches_eq(
337    ///     &results,
338    ///     &[
339    ///         "+----------+-------------+-----------+",
340    ///         "| bool_col | tinyint_col | float_col |",
341    ///         "+----------+-------------+-----------+",
342    ///         "| true     | 0           | 0.0       |",
343    ///         "| false    | 1           | 1.1       |",
344    ///         "| true     | 0           | 0.0       |",
345    ///         "| false    | 1           | 1.1       |",
346    ///         "| true     | 0           | 0.0       |",
347    ///         "| false    | 1           | 1.1       |",
348    ///         "| true     | 0           | 0.0       |",
349    ///         "| false    | 1           | 1.1       |",
350    ///         "+----------+-------------+-----------+",
351    ///      ],
352    ///  );
353    ///
354    /// // The results has 8 rows, so since we set the batch size to 3, we expect
355    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
356    /// assert_eq!(results.len(), 3);
357    /// # }
358    /// ```
359    pub async fn new(input: T) -> Result<Self> {
360        Self::new_with_options(input, Default::default()).await
361    }
362
363    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
364    /// and [`ArrowReaderOptions`].
365    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367        Ok(Self::new_with_metadata(input, metadata))
368    }
369
370    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
371    ///
372    /// This allows loading metadata once and using it to create multiple builders with
373    /// potentially different settings, that can be read in parallel.
374    ///
375    /// # Example of reading from multiple streams in parallel
376    ///
377    /// ```
378    /// # use std::fs::metadata;
379    /// # use std::sync::Arc;
380    /// # use bytes::Bytes;
381    /// # use arrow_array::{Int32Array, RecordBatch};
382    /// # use arrow_schema::{DataType, Field, Schema};
383    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
384    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
385    /// # use tempfile::tempfile;
386    /// # use futures::StreamExt;
387    /// # #[tokio::main(flavor="current_thread")]
388    /// # async fn main() {
389    /// #
390    /// # let mut file = tempfile().unwrap();
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// // open file with parquet data
397    /// let mut file = tokio::fs::File::from_std(file);
398    /// // load metadata once
399    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
400    /// // create two readers, a and b, from the same underlying file
401    /// // without reading the metadata again
402    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
403    ///     file.try_clone().await.unwrap(),
404    ///     meta.clone()
405    /// ).build().unwrap();
406    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
407    ///
408    /// // Can read batches from both readers in parallel
409    /// assert_eq!(
410    ///   a.next().await.unwrap().unwrap(),
411    ///   b.next().await.unwrap().unwrap(),
412    /// );
413    /// # }
414    /// ```
415    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416        Self::new_builder(AsyncReader(input), metadata)
417    }
418
419    /// Read bloom filter for a column in a row group
420    ///
421    /// Returns `None` if the column does not have a bloom filter
422    ///
423    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
424    pub async fn get_row_group_column_bloom_filter(
425        &mut self,
426        row_group_idx: usize,
427        column_idx: usize,
428    ) -> Result<Option<Sbbf>> {
429        let metadata = self.metadata.row_group(row_group_idx);
430        let column_metadata = metadata.column(column_idx);
431
432        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433            offset
434                .try_into()
435                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436        } else {
437            return Ok(None);
438        };
439
440        let buffer = match column_metadata.bloom_filter_length() {
441            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442            None => self
443                .input
444                .0
445                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446        }
447        .await?;
448
449        let (header, bitset_offset) =
450            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452        match header.algorithm {
453            BloomFilterAlgorithm::BLOCK => {
454                // this match exists to future proof the singleton algorithm enum
455            }
456        }
457        match header.compression {
458            BloomFilterCompression::UNCOMPRESSED => {
459                // this match exists to future proof the singleton compression enum
460            }
461        }
462        match header.hash {
463            BloomFilterHash::XXHASH => {
464                // this match exists to future proof the singleton hash enum
465            }
466        }
467
468        let bitset = match column_metadata.bloom_filter_length() {
469            Some(_) => buffer.slice(
470                (TryInto::<usize>::try_into(bitset_offset).unwrap()
471                    - TryInto::<usize>::try_into(offset).unwrap())..,
472            ),
473            None => {
474                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475                    ParquetError::General("Bloom filter length is invalid".to_string())
476                })?;
477                self.input
478                    .0
479                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
480                    .await?
481            }
482        };
483        Ok(Some(Sbbf::new(&bitset)))
484    }
485
486    /// Build a new [`ParquetRecordBatchStream`]
487    ///
488    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
489    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490        let num_row_groups = self.metadata.row_groups().len();
491
492        let row_groups = match self.row_groups {
493            Some(row_groups) => {
494                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495                    return Err(general_err!(
496                        "row group {} out of bounds 0..{}",
497                        col,
498                        num_row_groups
499                    ));
500                }
501                row_groups.into()
502            }
503            None => (0..self.metadata.row_groups().len()).collect(),
504        };
505
506        // Try to avoid allocate large buffer
507        let batch_size = self
508            .batch_size
509            .min(self.metadata.file_metadata().num_rows() as usize);
510        let reader_factory = ReaderFactory {
511            input: self.input.0,
512            filter: self.filter,
513            metadata: self.metadata.clone(),
514            fields: self.fields,
515            limit: self.limit,
516            offset: self.offset,
517            metrics: self.metrics,
518            max_predicate_cache_size: self.max_predicate_cache_size,
519        };
520
521        // Ensure schema of ParquetRecordBatchStream respects projection, and does
522        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
523        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524            Some(DataType::Struct(fields)) => {
525                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526            }
527            None => Fields::empty(),
528            _ => unreachable!("Must be Struct for root type"),
529        };
530        let schema = Arc::new(Schema::new(projected_fields));
531
532        Ok(ParquetRecordBatchStream {
533            metadata: self.metadata,
534            batch_size,
535            row_groups,
536            projection: self.projection,
537            selection: self.selection,
538            schema,
539            reader_factory: Some(reader_factory),
540            state: StreamState::Init,
541        })
542    }
543}
544
545/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
546///
547/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
548/// offset), returns `None` for the reader.
549type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
552/// [`ParquetRecordBatchReader`]
553struct ReaderFactory<T> {
554    metadata: Arc<ParquetMetaData>,
555
556    /// Top level parquet schema
557    fields: Option<Arc<ParquetField>>,
558
559    input: T,
560
561    /// Optional filter
562    filter: Option<RowFilter>,
563
564    /// Limit to apply to remaining row groups.  
565    limit: Option<usize>,
566
567    /// Offset to apply to the next
568    offset: Option<usize>,
569
570    /// Metrics
571    metrics: ArrowReaderMetrics,
572
573    /// Maximum size of the predicate cache
574    max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579    T: AsyncFileReader + Send,
580{
581    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
582    ///
583    /// Updates the `limit` and `offset` of the reader factory
584    ///
585    /// Note: this captures self so that the resulting future has a static lifetime
586    async fn read_row_group(
587        mut self,
588        row_group_idx: usize,
589        selection: Option<RowSelection>,
590        projection: ProjectionMask,
591        batch_size: usize,
592    ) -> ReadResult<T> {
593        // TODO: calling build_array multiple times is wasteful
594
595        let meta = self.metadata.row_group(row_group_idx);
596        let offset_index = self
597            .metadata
598            .offset_index()
599            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
600            .filter(|index| !index.is_empty())
601            .map(|x| x[row_group_idx].as_slice());
602
603        // Reuse columns that are selected and used by the filters
604        let cache_projection = match self.compute_cache_projection(&projection) {
605            Some(projection) => projection,
606            None => ProjectionMask::none(meta.columns().len()),
607        };
608        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609            batch_size,
610            self.max_predicate_cache_size,
611        )));
612
613        let mut row_group = InMemoryRowGroup {
614            // schema: meta.schema_descr_ptr(),
615            row_count: meta.num_rows() as usize,
616            column_chunks: vec![None; meta.columns().len()],
617            offset_index,
618            row_group_idx,
619            metadata: self.metadata.as_ref(),
620        };
621
622        let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
623
624        let filter = self.filter.as_mut();
625        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627        // Update selection based on any filters
628        if let Some(filter) = filter {
629            let cache_options = cache_options_builder.clone().producer();
630
631            for predicate in filter.predicates.iter_mut() {
632                if !plan_builder.selects_any() {
633                    return Ok((self, None)); // ruled out entire row group
634                }
635
636                // (pre) Fetch only the columns that are selected by the predicate
637                let selection = plan_builder.selection();
638                // Fetch predicate columns; expand selection only for cached predicate columns
639                let cache_mask = Some(&cache_projection);
640                row_group
641                    .fetch(
642                        &mut self.input,
643                        predicate.projection(),
644                        selection,
645                        batch_size,
646                        cache_mask,
647                    )
648                    .await?;
649
650                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651                    .with_cache_options(Some(&cache_options))
652                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655            }
656        }
657
658        // Compute the number of rows in the selection before applying limit and offset
659        let rows_before = plan_builder
660            .num_rows_selected()
661            .unwrap_or(row_group.row_count);
662
663        if rows_before == 0 {
664            return Ok((self, None)); // ruled out entire row group
665        }
666
667        // Apply any limit and offset
668        let plan_builder = plan_builder
669            .limited(row_group.row_count)
670            .with_offset(self.offset)
671            .with_limit(self.limit)
672            .build_limited();
673
674        let rows_after = plan_builder
675            .num_rows_selected()
676            .unwrap_or(row_group.row_count);
677
678        // Update running offset and limit for after the current row group is read
679        if let Some(offset) = &mut self.offset {
680            // Reduction is either because of offset or limit, as limit is applied
681            // after offset has been "exhausted" can just use saturating sub here
682            *offset = offset.saturating_sub(rows_before - rows_after)
683        }
684
685        if rows_after == 0 {
686            return Ok((self, None)); // ruled out entire row group
687        }
688
689        if let Some(limit) = &mut self.limit {
690            *limit -= rows_after;
691        }
692        // fetch the pages needed for decoding
693        row_group
694            // Final projection fetch shouldn't expand selection for cache; pass None
695            .fetch(
696                &mut self.input,
697                &projection,
698                plan_builder.selection(),
699                batch_size,
700                None,
701            )
702            .await?;
703
704        let plan = plan_builder.build();
705
706        let cache_options = cache_options_builder.consumer();
707        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708            .with_cache_options(Some(&cache_options))
709            .build_array_reader(self.fields.as_deref(), &projection)?;
710
711        let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713        Ok((self, Some(reader)))
714    }
715
716    /// Compute which columns are used in filters and the final (output) projection
717    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718        // Do not compute the projection mask if the predicate cache is disabled
719        if self.max_predicate_cache_size == 0 {
720            return None;
721        }
722
723        let filters = self.filter.as_ref()?;
724        let mut cache_projection = filters.predicates.first()?.projection().clone();
725        for predicate in filters.predicates.iter() {
726            cache_projection.union(predicate.projection());
727        }
728        cache_projection.intersect(projection);
729        self.exclude_nested_columns_from_cache(&cache_projection)
730    }
731
732    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
733    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
734        let schema = self.metadata.file_metadata().schema_descr();
735        let num_leaves = schema.num_columns();
736
737        // Count how many leaves each root column has
738        let num_roots = schema.root_schema().get_fields().len();
739        let mut root_leaf_counts = vec![0usize; num_roots];
740        for leaf_idx in 0..num_leaves {
741            let root_idx = schema.get_column_root_idx(leaf_idx);
742            root_leaf_counts[root_idx] += 1;
743        }
744
745        // Keep only leaves whose root has exactly one leaf (non-nested)
746        let mut included_leaves = Vec::new();
747        for leaf_idx in 0..num_leaves {
748            if mask.leaf_included(leaf_idx) {
749                let root_idx = schema.get_column_root_idx(leaf_idx);
750                if root_leaf_counts[root_idx] == 1 {
751                    included_leaves.push(leaf_idx);
752                }
753            }
754        }
755
756        if included_leaves.is_empty() {
757            None
758        } else {
759            Some(ProjectionMask::leaves(schema, included_leaves))
760        }
761    }
762}
763
764enum StreamState<T> {
765    /// At the start of a new row group, or the end of the parquet stream
766    Init,
767    /// Decoding a batch
768    Decoding(ParquetRecordBatchReader),
769    /// Reading data from input
770    Reading(BoxFuture<'static, ReadResult<T>>),
771    /// Error
772    Error,
773}
774
775impl<T> std::fmt::Debug for StreamState<T> {
776    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
777        match self {
778            StreamState::Init => write!(f, "StreamState::Init"),
779            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
780            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
781            StreamState::Error => write!(f, "StreamState::Error"),
782        }
783    }
784}
785
786/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
787///
788/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
789/// allowing users to decode record batches separately from I/O.
790///
791/// # I/O Buffering
792///
793/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
794/// (projection + filtering, etc) and decodes the rows from those buffered pages.
795///
796/// For example, if all rows and columns are selected, the entire row group is
797/// buffered in memory during decode. This minimizes the number of IO operations
798/// required, which is especially important for object stores, where IO operations
799/// have latencies in the hundreds of milliseconds
800///
801///
802/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
803pub struct ParquetRecordBatchStream<T> {
804    metadata: Arc<ParquetMetaData>,
805
806    schema: SchemaRef,
807
808    row_groups: VecDeque<usize>,
809
810    projection: ProjectionMask,
811
812    batch_size: usize,
813
814    selection: Option<RowSelection>,
815
816    /// This is an option so it can be moved into a future
817    reader_factory: Option<ReaderFactory<T>>,
818
819    state: StreamState<T>,
820}
821
822impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
823    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
824        f.debug_struct("ParquetRecordBatchStream")
825            .field("metadata", &self.metadata)
826            .field("schema", &self.schema)
827            .field("batch_size", &self.batch_size)
828            .field("projection", &self.projection)
829            .field("state", &self.state)
830            .finish()
831    }
832}
833
834impl<T> ParquetRecordBatchStream<T> {
835    /// Returns the projected [`SchemaRef`] for reading the parquet file.
836    ///
837    /// Note that the schema metadata will be stripped here. See
838    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
839    pub fn schema(&self) -> &SchemaRef {
840        &self.schema
841    }
842}
843
844impl<T> ParquetRecordBatchStream<T>
845where
846    T: AsyncFileReader + Unpin + Send + 'static,
847{
848    /// Fetches the next row group from the stream.
849    ///
850    /// Users can continue to call this function to get row groups and decode them concurrently.
851    ///
852    /// ## Notes
853    ///
854    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
855    ///
856    /// ## Returns
857    ///
858    /// - `Ok(None)` if the stream has ended.
859    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
860    /// - `Ok(Some(reader))` which holds all the data for the row group.
861    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
862        loop {
863            match &mut self.state {
864                StreamState::Decoding(_) | StreamState::Reading(_) => {
865                    return Err(ParquetError::General(
866                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
867                    ))
868                }
869                StreamState::Init => {
870                    let row_group_idx = match self.row_groups.pop_front() {
871                        Some(idx) => idx,
872                        None => return Ok(None),
873                    };
874
875                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
876
877                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
878
879                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
880
881                    let (reader_factory, maybe_reader) = reader_factory
882                        .read_row_group(
883                            row_group_idx,
884                            selection,
885                            self.projection.clone(),
886                            self.batch_size,
887                        )
888                        .await
889                        .inspect_err(|_| {
890                            self.state = StreamState::Error;
891                        })?;
892                    self.reader_factory = Some(reader_factory);
893
894                    if let Some(reader) = maybe_reader {
895                        return Ok(Some(reader));
896                    } else {
897                        // All rows skipped, read next row group
898                        continue;
899                    }
900                }
901                StreamState::Error => return Ok(None), // Ends the stream as error happens.
902            }
903        }
904    }
905}
906
907impl<T> Stream for ParquetRecordBatchStream<T>
908where
909    T: AsyncFileReader + Unpin + Send + 'static,
910{
911    type Item = Result<RecordBatch>;
912
913    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
914        loop {
915            match &mut self.state {
916                StreamState::Decoding(batch_reader) => match batch_reader.next() {
917                    Some(Ok(batch)) => {
918                        return Poll::Ready(Some(Ok(batch)));
919                    }
920                    Some(Err(e)) => {
921                        self.state = StreamState::Error;
922                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
923                    }
924                    None => self.state = StreamState::Init,
925                },
926                StreamState::Init => {
927                    let row_group_idx = match self.row_groups.pop_front() {
928                        Some(idx) => idx,
929                        None => return Poll::Ready(None),
930                    };
931
932                    let reader = self.reader_factory.take().expect("lost reader factory");
933
934                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
935
936                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
937
938                    let fut = reader
939                        .read_row_group(
940                            row_group_idx,
941                            selection,
942                            self.projection.clone(),
943                            self.batch_size,
944                        )
945                        .boxed();
946
947                    self.state = StreamState::Reading(fut)
948                }
949                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
950                    Ok((reader_factory, maybe_reader)) => {
951                        self.reader_factory = Some(reader_factory);
952                        match maybe_reader {
953                            // Read records from [`ParquetRecordBatchReader`]
954                            Some(reader) => self.state = StreamState::Decoding(reader),
955                            // All rows skipped, read next row group
956                            None => self.state = StreamState::Init,
957                        }
958                    }
959                    Err(e) => {
960                        self.state = StreamState::Error;
961                        return Poll::Ready(Some(Err(e)));
962                    }
963                },
964                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
965            }
966        }
967    }
968}
969
970/// An in-memory collection of column chunks
971struct InMemoryRowGroup<'a> {
972    offset_index: Option<&'a [OffsetIndexMetaData]>,
973    /// Column chunks for this row group
974    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
975    row_count: usize,
976    row_group_idx: usize,
977    metadata: &'a ParquetMetaData,
978}
979
980impl InMemoryRowGroup<'_> {
981    /// Fetches any additional column data specified in `projection` that is not already
982    /// present in `self.column_chunks`.
983    ///
984    /// If `selection` is provided, only the pages required for the selection
985    /// are fetched. Otherwise, all pages are fetched.
986    async fn fetch<T: AsyncFileReader + Send>(
987        &mut self,
988        input: &mut T,
989        projection: &ProjectionMask,
990        selection: Option<&RowSelection>,
991        batch_size: usize,
992        cache_mask: Option<&ProjectionMask>,
993    ) -> Result<()> {
994        let metadata = self.metadata.row_group(self.row_group_idx);
995        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
996            let expanded_selection =
997                selection.expand_to_batch_boundaries(batch_size, self.row_count);
998            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
999            // `RowSelection`
1000            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
1001
1002            let fetch_ranges = self
1003                .column_chunks
1004                .iter()
1005                .zip(metadata.columns())
1006                .enumerate()
1007                .filter(|&(idx, (chunk, _chunk_meta))| {
1008                    chunk.is_none() && projection.leaf_included(idx)
1009                })
1010                .flat_map(|(idx, (_chunk, chunk_meta))| {
1011                    // If the first page does not start at the beginning of the column,
1012                    // then we need to also fetch a dictionary page.
1013                    let mut ranges: Vec<Range<u64>> = vec![];
1014                    let (start, _len) = chunk_meta.byte_range();
1015                    match offset_index[idx].page_locations.first() {
1016                        Some(first) if first.offset as u64 != start => {
1017                            ranges.push(start..first.offset as u64);
1018                        }
1019                        _ => (),
1020                    }
1021
1022                    // Expand selection to batch boundaries only for cached columns
1023                    let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1024                    if use_expanded {
1025                        ranges.extend(
1026                            expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1027                        );
1028                    } else {
1029                        ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1030                    }
1031                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1032
1033                    ranges
1034                })
1035                .collect();
1036
1037            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1038            let mut page_start_offsets = page_start_offsets.into_iter();
1039
1040            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1041                if chunk.is_some() || !projection.leaf_included(idx) {
1042                    continue;
1043                }
1044
1045                if let Some(offsets) = page_start_offsets.next() {
1046                    let mut chunks = Vec::with_capacity(offsets.len());
1047                    for _ in 0..offsets.len() {
1048                        chunks.push(chunk_data.next().unwrap());
1049                    }
1050
1051                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1052                        length: metadata.column(idx).byte_range().1 as usize,
1053                        data: offsets
1054                            .into_iter()
1055                            .map(|x| x as usize)
1056                            .zip(chunks.into_iter())
1057                            .collect(),
1058                    }))
1059                }
1060            }
1061        } else {
1062            let fetch_ranges = self
1063                .column_chunks
1064                .iter()
1065                .enumerate()
1066                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1067                .map(|(idx, _chunk)| {
1068                    let column = metadata.column(idx);
1069                    let (start, length) = column.byte_range();
1070                    start..(start + length)
1071                })
1072                .collect();
1073
1074            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1075
1076            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1077                if chunk.is_some() || !projection.leaf_included(idx) {
1078                    continue;
1079                }
1080
1081                if let Some(data) = chunk_data.next() {
1082                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
1083                        offset: metadata.column(idx).byte_range().0 as usize,
1084                        data,
1085                    }));
1086                }
1087            }
1088        }
1089
1090        Ok(())
1091    }
1092}
1093
1094impl RowGroups for InMemoryRowGroup<'_> {
1095    fn num_rows(&self) -> usize {
1096        self.row_count
1097    }
1098
1099    /// Return chunks for column i
1100    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1101        match &self.column_chunks[i] {
1102            None => Err(ParquetError::General(format!(
1103                "Invalid column index {i}, column was not fetched"
1104            ))),
1105            Some(data) => {
1106                let page_locations = self
1107                    .offset_index
1108                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1109                    .filter(|index| !index.is_empty())
1110                    .map(|index| index[i].page_locations.clone());
1111                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1112                let page_reader = SerializedPageReader::new(
1113                    data.clone(),
1114                    column_chunk_metadata,
1115                    self.row_count,
1116                    page_locations,
1117                )?;
1118                let page_reader = page_reader.add_crypto_context(
1119                    self.row_group_idx,
1120                    i,
1121                    self.metadata,
1122                    column_chunk_metadata,
1123                )?;
1124
1125                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1126
1127                Ok(Box::new(ColumnChunkIterator {
1128                    reader: Some(Ok(page_reader)),
1129                }))
1130            }
1131        }
1132    }
1133}
1134
1135/// An in-memory column chunk
1136#[derive(Clone)]
1137enum ColumnChunkData {
1138    /// Column chunk data representing only a subset of data pages
1139    Sparse {
1140        /// Length of the full column chunk
1141        length: usize,
1142        /// Subset of data pages included in this sparse chunk.
1143        ///
1144        /// Each element is a tuple of (page offset within file, page data).
1145        /// Each entry is a complete page and the list is ordered by offset.
1146        data: Vec<(usize, Bytes)>,
1147    },
1148    /// Full column chunk and the offset within the original file
1149    Dense { offset: usize, data: Bytes },
1150}
1151
1152impl ColumnChunkData {
1153    /// Return the data for this column chunk at the given offset
1154    fn get(&self, start: u64) -> Result<Bytes> {
1155        match &self {
1156            ColumnChunkData::Sparse { data, .. } => data
1157                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1158                .map(|idx| data[idx].1.clone())
1159                .map_err(|_| {
1160                    ParquetError::General(format!(
1161                        "Invalid offset in sparse column chunk data: {start}"
1162                    ))
1163                }),
1164            ColumnChunkData::Dense { offset, data } => {
1165                let start = start as usize - *offset;
1166                Ok(data.slice(start..))
1167            }
1168        }
1169    }
1170}
1171
1172impl Length for ColumnChunkData {
1173    /// Return the total length of the full column chunk
1174    fn len(&self) -> u64 {
1175        match &self {
1176            ColumnChunkData::Sparse { length, .. } => *length as u64,
1177            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1178        }
1179    }
1180}
1181
1182impl ChunkReader for ColumnChunkData {
1183    type T = bytes::buf::Reader<Bytes>;
1184
1185    fn get_read(&self, start: u64) -> Result<Self::T> {
1186        Ok(self.get(start)?.reader())
1187    }
1188
1189    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1190        Ok(self.get(start)?.slice(..length))
1191    }
1192}
1193
1194/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1195struct ColumnChunkIterator {
1196    reader: Option<Result<Box<dyn PageReader>>>,
1197}
1198
1199impl Iterator for ColumnChunkIterator {
1200    type Item = Result<Box<dyn PageReader>>;
1201
1202    fn next(&mut self) -> Option<Self::Item> {
1203        self.reader.take()
1204    }
1205}
1206
1207impl PageIterator for ColumnChunkIterator {}
1208
1209#[cfg(test)]
1210mod tests {
1211    use super::*;
1212    use crate::arrow::arrow_reader::{
1213        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1214    };
1215    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1216    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1217    use crate::arrow::ArrowWriter;
1218    use crate::file::metadata::ParquetMetaDataReader;
1219    use crate::file::properties::WriterProperties;
1220    use arrow::compute::kernels::cmp::eq;
1221    use arrow::error::Result as ArrowResult;
1222    use arrow_array::builder::{ListBuilder, StringBuilder};
1223    use arrow_array::cast::AsArray;
1224    use arrow_array::types::Int32Type;
1225    use arrow_array::{
1226        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1227        StructArray, UInt64Array,
1228    };
1229    use arrow_schema::{DataType, Field, Schema};
1230    use futures::{StreamExt, TryStreamExt};
1231    use rand::{rng, Rng};
1232    use std::collections::HashMap;
1233    use std::sync::{Arc, Mutex};
1234    use tempfile::tempfile;
1235
1236    #[derive(Clone)]
1237    struct TestReader {
1238        data: Bytes,
1239        metadata: Option<Arc<ParquetMetaData>>,
1240        requests: Arc<Mutex<Vec<Range<usize>>>>,
1241    }
1242
1243    impl TestReader {
1244        fn new(data: Bytes) -> Self {
1245            Self {
1246                data,
1247                metadata: Default::default(),
1248                requests: Default::default(),
1249            }
1250        }
1251    }
1252
1253    impl AsyncFileReader for TestReader {
1254        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1255            let range = range.clone();
1256            self.requests
1257                .lock()
1258                .unwrap()
1259                .push(range.start as usize..range.end as usize);
1260            futures::future::ready(Ok(self
1261                .data
1262                .slice(range.start as usize..range.end as usize)))
1263            .boxed()
1264        }
1265
1266        fn get_metadata<'a>(
1267            &'a mut self,
1268            options: Option<&'a ArrowReaderOptions>,
1269        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1270            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1271                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1272            );
1273            self.metadata = Some(Arc::new(
1274                metadata_reader.parse_and_finish(&self.data).unwrap(),
1275            ));
1276            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1277        }
1278    }
1279
1280    #[tokio::test]
1281    async fn test_async_reader() {
1282        let testdata = arrow::util::test_util::parquet_test_data();
1283        let path = format!("{testdata}/alltypes_plain.parquet");
1284        let data = Bytes::from(std::fs::read(path).unwrap());
1285
1286        let async_reader = TestReader::new(data.clone());
1287
1288        let requests = async_reader.requests.clone();
1289        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1290            .await
1291            .unwrap();
1292
1293        let metadata = builder.metadata().clone();
1294        assert_eq!(metadata.num_row_groups(), 1);
1295
1296        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1297        let stream = builder
1298            .with_projection(mask.clone())
1299            .with_batch_size(1024)
1300            .build()
1301            .unwrap();
1302
1303        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1304
1305        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1306            .unwrap()
1307            .with_projection(mask)
1308            .with_batch_size(104)
1309            .build()
1310            .unwrap()
1311            .collect::<ArrowResult<Vec<_>>>()
1312            .unwrap();
1313
1314        assert_eq!(async_batches, sync_batches);
1315
1316        let requests = requests.lock().unwrap();
1317        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1318        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1319
1320        assert_eq!(
1321            &requests[..],
1322            &[
1323                offset_1 as usize..(offset_1 + length_1) as usize,
1324                offset_2 as usize..(offset_2 + length_2) as usize
1325            ]
1326        );
1327    }
1328
1329    #[tokio::test]
1330    async fn test_async_reader_with_next_row_group() {
1331        let testdata = arrow::util::test_util::parquet_test_data();
1332        let path = format!("{testdata}/alltypes_plain.parquet");
1333        let data = Bytes::from(std::fs::read(path).unwrap());
1334
1335        let async_reader = TestReader::new(data.clone());
1336
1337        let requests = async_reader.requests.clone();
1338        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1339            .await
1340            .unwrap();
1341
1342        let metadata = builder.metadata().clone();
1343        assert_eq!(metadata.num_row_groups(), 1);
1344
1345        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1346        let mut stream = builder
1347            .with_projection(mask.clone())
1348            .with_batch_size(1024)
1349            .build()
1350            .unwrap();
1351
1352        let mut readers = vec![];
1353        while let Some(reader) = stream.next_row_group().await.unwrap() {
1354            readers.push(reader);
1355        }
1356
1357        let async_batches: Vec<_> = readers
1358            .into_iter()
1359            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1360            .collect();
1361
1362        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1363            .unwrap()
1364            .with_projection(mask)
1365            .with_batch_size(104)
1366            .build()
1367            .unwrap()
1368            .collect::<ArrowResult<Vec<_>>>()
1369            .unwrap();
1370
1371        assert_eq!(async_batches, sync_batches);
1372
1373        let requests = requests.lock().unwrap();
1374        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1375        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1376
1377        assert_eq!(
1378            &requests[..],
1379            &[
1380                offset_1 as usize..(offset_1 + length_1) as usize,
1381                offset_2 as usize..(offset_2 + length_2) as usize
1382            ]
1383        );
1384    }
1385
1386    #[tokio::test]
1387    async fn test_async_reader_with_index() {
1388        let testdata = arrow::util::test_util::parquet_test_data();
1389        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1390        let data = Bytes::from(std::fs::read(path).unwrap());
1391
1392        let async_reader = TestReader::new(data.clone());
1393
1394        let options = ArrowReaderOptions::new().with_page_index(true);
1395        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1396            .await
1397            .unwrap();
1398
1399        // The builder should have page and offset indexes loaded now
1400        let metadata_with_index = builder.metadata();
1401        assert_eq!(metadata_with_index.num_row_groups(), 1);
1402
1403        // Check offset indexes are present for all columns
1404        let offset_index = metadata_with_index.offset_index().unwrap();
1405        let column_index = metadata_with_index.column_index().unwrap();
1406
1407        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1408        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1409
1410        let num_columns = metadata_with_index
1411            .file_metadata()
1412            .schema_descr()
1413            .num_columns();
1414
1415        // Check page indexes are present for all columns
1416        offset_index
1417            .iter()
1418            .for_each(|x| assert_eq!(x.len(), num_columns));
1419        column_index
1420            .iter()
1421            .for_each(|x| assert_eq!(x.len(), num_columns));
1422
1423        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1424        let stream = builder
1425            .with_projection(mask.clone())
1426            .with_batch_size(1024)
1427            .build()
1428            .unwrap();
1429
1430        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1431
1432        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1433            .unwrap()
1434            .with_projection(mask)
1435            .with_batch_size(1024)
1436            .build()
1437            .unwrap()
1438            .collect::<ArrowResult<Vec<_>>>()
1439            .unwrap();
1440
1441        assert_eq!(async_batches, sync_batches);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_async_reader_with_limit() {
1446        let testdata = arrow::util::test_util::parquet_test_data();
1447        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1448        let data = Bytes::from(std::fs::read(path).unwrap());
1449
1450        let metadata = ParquetMetaDataReader::new()
1451            .parse_and_finish(&data)
1452            .unwrap();
1453        let metadata = Arc::new(metadata);
1454
1455        assert_eq!(metadata.num_row_groups(), 1);
1456
1457        let async_reader = TestReader::new(data.clone());
1458
1459        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1460            .await
1461            .unwrap();
1462
1463        assert_eq!(builder.metadata().num_row_groups(), 1);
1464
1465        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1466        let stream = builder
1467            .with_projection(mask.clone())
1468            .with_batch_size(1024)
1469            .with_limit(1)
1470            .build()
1471            .unwrap();
1472
1473        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1474
1475        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1476            .unwrap()
1477            .with_projection(mask)
1478            .with_batch_size(1024)
1479            .with_limit(1)
1480            .build()
1481            .unwrap()
1482            .collect::<ArrowResult<Vec<_>>>()
1483            .unwrap();
1484
1485        assert_eq!(async_batches, sync_batches);
1486    }
1487
1488    #[tokio::test]
1489    async fn test_async_reader_skip_pages() {
1490        let testdata = arrow::util::test_util::parquet_test_data();
1491        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1492        let data = Bytes::from(std::fs::read(path).unwrap());
1493
1494        let async_reader = TestReader::new(data.clone());
1495
1496        let options = ArrowReaderOptions::new().with_page_index(true);
1497        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1498            .await
1499            .unwrap();
1500
1501        assert_eq!(builder.metadata().num_row_groups(), 1);
1502
1503        let selection = RowSelection::from(vec![
1504            RowSelector::skip(21),   // Skip first page
1505            RowSelector::select(21), // Select page to boundary
1506            RowSelector::skip(41),   // Skip multiple pages
1507            RowSelector::select(41), // Select multiple pages
1508            RowSelector::skip(25),   // Skip page across boundary
1509            RowSelector::select(25), // Select across page boundary
1510            RowSelector::skip(7116), // Skip to final page boundary
1511            RowSelector::select(10), // Select final page
1512        ]);
1513
1514        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1515
1516        let stream = builder
1517            .with_projection(mask.clone())
1518            .with_row_selection(selection.clone())
1519            .build()
1520            .expect("building stream");
1521
1522        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1523
1524        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1525            .unwrap()
1526            .with_projection(mask)
1527            .with_batch_size(1024)
1528            .with_row_selection(selection)
1529            .build()
1530            .unwrap()
1531            .collect::<ArrowResult<Vec<_>>>()
1532            .unwrap();
1533
1534        assert_eq!(async_batches, sync_batches);
1535    }
1536
1537    #[tokio::test]
1538    async fn test_fuzz_async_reader_selection() {
1539        let testdata = arrow::util::test_util::parquet_test_data();
1540        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1541        let data = Bytes::from(std::fs::read(path).unwrap());
1542
1543        let mut rand = rng();
1544
1545        for _ in 0..100 {
1546            let mut expected_rows = 0;
1547            let mut total_rows = 0;
1548            let mut skip = false;
1549            let mut selectors = vec![];
1550
1551            while total_rows < 7300 {
1552                let row_count: usize = rand.random_range(1..100);
1553
1554                let row_count = row_count.min(7300 - total_rows);
1555
1556                selectors.push(RowSelector { row_count, skip });
1557
1558                total_rows += row_count;
1559                if !skip {
1560                    expected_rows += row_count;
1561                }
1562
1563                skip = !skip;
1564            }
1565
1566            let selection = RowSelection::from(selectors);
1567
1568            let async_reader = TestReader::new(data.clone());
1569
1570            let options = ArrowReaderOptions::new().with_page_index(true);
1571            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1572                .await
1573                .unwrap();
1574
1575            assert_eq!(builder.metadata().num_row_groups(), 1);
1576
1577            let col_idx: usize = rand.random_range(0..13);
1578            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1579
1580            let stream = builder
1581                .with_projection(mask.clone())
1582                .with_row_selection(selection.clone())
1583                .build()
1584                .expect("building stream");
1585
1586            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1587
1588            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1589
1590            assert_eq!(actual_rows, expected_rows);
1591        }
1592    }
1593
1594    #[tokio::test]
1595    async fn test_async_reader_zero_row_selector() {
1596        //See https://github.com/apache/arrow-rs/issues/2669
1597        let testdata = arrow::util::test_util::parquet_test_data();
1598        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1599        let data = Bytes::from(std::fs::read(path).unwrap());
1600
1601        let mut rand = rng();
1602
1603        let mut expected_rows = 0;
1604        let mut total_rows = 0;
1605        let mut skip = false;
1606        let mut selectors = vec![];
1607
1608        selectors.push(RowSelector {
1609            row_count: 0,
1610            skip: false,
1611        });
1612
1613        while total_rows < 7300 {
1614            let row_count: usize = rand.random_range(1..100);
1615
1616            let row_count = row_count.min(7300 - total_rows);
1617
1618            selectors.push(RowSelector { row_count, skip });
1619
1620            total_rows += row_count;
1621            if !skip {
1622                expected_rows += row_count;
1623            }
1624
1625            skip = !skip;
1626        }
1627
1628        let selection = RowSelection::from(selectors);
1629
1630        let async_reader = TestReader::new(data.clone());
1631
1632        let options = ArrowReaderOptions::new().with_page_index(true);
1633        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1634            .await
1635            .unwrap();
1636
1637        assert_eq!(builder.metadata().num_row_groups(), 1);
1638
1639        let col_idx: usize = rand.random_range(0..13);
1640        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1641
1642        let stream = builder
1643            .with_projection(mask.clone())
1644            .with_row_selection(selection.clone())
1645            .build()
1646            .expect("building stream");
1647
1648        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1649
1650        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1651
1652        assert_eq!(actual_rows, expected_rows);
1653    }
1654
1655    #[tokio::test]
1656    async fn test_row_filter() {
1657        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1658        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1659        let data = RecordBatch::try_from_iter([
1660            ("a", Arc::new(a) as ArrayRef),
1661            ("b", Arc::new(b) as ArrayRef),
1662        ])
1663        .unwrap();
1664
1665        let mut buf = Vec::with_capacity(1024);
1666        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1667        writer.write(&data).unwrap();
1668        writer.close().unwrap();
1669
1670        let data: Bytes = buf.into();
1671        let metadata = ParquetMetaDataReader::new()
1672            .parse_and_finish(&data)
1673            .unwrap();
1674        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1675
1676        let test = TestReader::new(data);
1677        let requests = test.requests.clone();
1678
1679        let a_scalar = StringArray::from_iter_values(["b"]);
1680        let a_filter = ArrowPredicateFn::new(
1681            ProjectionMask::leaves(&parquet_schema, vec![0]),
1682            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1683        );
1684
1685        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1686
1687        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1688        let stream = ParquetRecordBatchStreamBuilder::new(test)
1689            .await
1690            .unwrap()
1691            .with_projection(mask.clone())
1692            .with_batch_size(1024)
1693            .with_row_filter(filter)
1694            .build()
1695            .unwrap();
1696
1697        let batches: Vec<_> = stream.try_collect().await.unwrap();
1698        assert_eq!(batches.len(), 1);
1699
1700        let batch = &batches[0];
1701        assert_eq!(batch.num_columns(), 2);
1702
1703        // Filter should have kept only rows with "b" in column 0
1704        assert_eq!(
1705            batch.column(0).as_ref(),
1706            &StringArray::from_iter_values(["b", "b", "b"])
1707        );
1708        assert_eq!(
1709            batch.column(1).as_ref(),
1710            &StringArray::from_iter_values(["2", "3", "4"])
1711        );
1712
1713        // Should only have made 2 requests:
1714        // * First request fetches data for evaluating the predicate
1715        // * Second request fetches data for evaluating the projection
1716        assert_eq!(requests.lock().unwrap().len(), 2);
1717    }
1718
1719    #[tokio::test]
1720    async fn test_two_row_filters() {
1721        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1722        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1723        let c = Int32Array::from_iter(0..6);
1724        let data = RecordBatch::try_from_iter([
1725            ("a", Arc::new(a) as ArrayRef),
1726            ("b", Arc::new(b) as ArrayRef),
1727            ("c", Arc::new(c) as ArrayRef),
1728        ])
1729        .unwrap();
1730
1731        let mut buf = Vec::with_capacity(1024);
1732        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1733        writer.write(&data).unwrap();
1734        writer.close().unwrap();
1735
1736        let data: Bytes = buf.into();
1737        let metadata = ParquetMetaDataReader::new()
1738            .parse_and_finish(&data)
1739            .unwrap();
1740        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1741
1742        let test = TestReader::new(data);
1743        let requests = test.requests.clone();
1744
1745        let a_scalar = StringArray::from_iter_values(["b"]);
1746        let a_filter = ArrowPredicateFn::new(
1747            ProjectionMask::leaves(&parquet_schema, vec![0]),
1748            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1749        );
1750
1751        let b_scalar = StringArray::from_iter_values(["4"]);
1752        let b_filter = ArrowPredicateFn::new(
1753            ProjectionMask::leaves(&parquet_schema, vec![1]),
1754            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1755        );
1756
1757        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1758
1759        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1760        let stream = ParquetRecordBatchStreamBuilder::new(test)
1761            .await
1762            .unwrap()
1763            .with_projection(mask.clone())
1764            .with_batch_size(1024)
1765            .with_row_filter(filter)
1766            .build()
1767            .unwrap();
1768
1769        let batches: Vec<_> = stream.try_collect().await.unwrap();
1770        assert_eq!(batches.len(), 1);
1771
1772        let batch = &batches[0];
1773        assert_eq!(batch.num_rows(), 1);
1774        assert_eq!(batch.num_columns(), 2);
1775
1776        let col = batch.column(0);
1777        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1778        assert_eq!(val, "b");
1779
1780        let col = batch.column(1);
1781        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1782        assert_eq!(val, 3);
1783
1784        // Should only have made 3 requests
1785        // * First request fetches data for evaluating the first predicate
1786        // * Second request fetches data for evaluating the second predicate
1787        // * Third request fetches data for evaluating the projection
1788        assert_eq!(requests.lock().unwrap().len(), 3);
1789    }
1790
1791    #[tokio::test]
1792    async fn test_limit_multiple_row_groups() {
1793        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1794        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1795        let c = Int32Array::from_iter(0..6);
1796        let data = RecordBatch::try_from_iter([
1797            ("a", Arc::new(a) as ArrayRef),
1798            ("b", Arc::new(b) as ArrayRef),
1799            ("c", Arc::new(c) as ArrayRef),
1800        ])
1801        .unwrap();
1802
1803        let mut buf = Vec::with_capacity(1024);
1804        let props = WriterProperties::builder()
1805            .set_max_row_group_size(3)
1806            .build();
1807        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1808        writer.write(&data).unwrap();
1809        writer.close().unwrap();
1810
1811        let data: Bytes = buf.into();
1812        let metadata = ParquetMetaDataReader::new()
1813            .parse_and_finish(&data)
1814            .unwrap();
1815
1816        assert_eq!(metadata.num_row_groups(), 2);
1817
1818        let test = TestReader::new(data);
1819
1820        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1821            .await
1822            .unwrap()
1823            .with_batch_size(1024)
1824            .with_limit(4)
1825            .build()
1826            .unwrap();
1827
1828        let batches: Vec<_> = stream.try_collect().await.unwrap();
1829        // Expect one batch for each row group
1830        assert_eq!(batches.len(), 2);
1831
1832        let batch = &batches[0];
1833        // First batch should contain all rows
1834        assert_eq!(batch.num_rows(), 3);
1835        assert_eq!(batch.num_columns(), 3);
1836        let col2 = batch.column(2).as_primitive::<Int32Type>();
1837        assert_eq!(col2.values(), &[0, 1, 2]);
1838
1839        let batch = &batches[1];
1840        // Second batch should trigger the limit and only have one row
1841        assert_eq!(batch.num_rows(), 1);
1842        assert_eq!(batch.num_columns(), 3);
1843        let col2 = batch.column(2).as_primitive::<Int32Type>();
1844        assert_eq!(col2.values(), &[3]);
1845
1846        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1847            .await
1848            .unwrap()
1849            .with_offset(2)
1850            .with_limit(3)
1851            .build()
1852            .unwrap();
1853
1854        let batches: Vec<_> = stream.try_collect().await.unwrap();
1855        // Expect one batch for each row group
1856        assert_eq!(batches.len(), 2);
1857
1858        let batch = &batches[0];
1859        // First batch should contain one row
1860        assert_eq!(batch.num_rows(), 1);
1861        assert_eq!(batch.num_columns(), 3);
1862        let col2 = batch.column(2).as_primitive::<Int32Type>();
1863        assert_eq!(col2.values(), &[2]);
1864
1865        let batch = &batches[1];
1866        // Second batch should contain two rows
1867        assert_eq!(batch.num_rows(), 2);
1868        assert_eq!(batch.num_columns(), 3);
1869        let col2 = batch.column(2).as_primitive::<Int32Type>();
1870        assert_eq!(col2.values(), &[3, 4]);
1871
1872        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1873            .await
1874            .unwrap()
1875            .with_offset(4)
1876            .with_limit(20)
1877            .build()
1878            .unwrap();
1879
1880        let batches: Vec<_> = stream.try_collect().await.unwrap();
1881        // Should skip first row group
1882        assert_eq!(batches.len(), 1);
1883
1884        let batch = &batches[0];
1885        // First batch should contain two rows
1886        assert_eq!(batch.num_rows(), 2);
1887        assert_eq!(batch.num_columns(), 3);
1888        let col2 = batch.column(2).as_primitive::<Int32Type>();
1889        assert_eq!(col2.values(), &[4, 5]);
1890    }
1891
1892    #[tokio::test]
1893    async fn test_row_filter_with_index() {
1894        let testdata = arrow::util::test_util::parquet_test_data();
1895        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1896        let data = Bytes::from(std::fs::read(path).unwrap());
1897
1898        let metadata = ParquetMetaDataReader::new()
1899            .parse_and_finish(&data)
1900            .unwrap();
1901        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1902
1903        assert_eq!(metadata.num_row_groups(), 1);
1904
1905        let async_reader = TestReader::new(data.clone());
1906
1907        let a_filter =
1908            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1909                Ok(batch.column(0).as_boolean().clone())
1910            });
1911
1912        let b_scalar = Int8Array::from(vec![2]);
1913        let b_filter = ArrowPredicateFn::new(
1914            ProjectionMask::leaves(&parquet_schema, vec![2]),
1915            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1916        );
1917
1918        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1919
1920        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1921
1922        let options = ArrowReaderOptions::new().with_page_index(true);
1923        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1924            .await
1925            .unwrap()
1926            .with_projection(mask.clone())
1927            .with_batch_size(1024)
1928            .with_row_filter(filter)
1929            .build()
1930            .unwrap();
1931
1932        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1933
1934        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1935
1936        assert_eq!(total_rows, 730);
1937    }
1938
1939    #[tokio::test]
1940    #[allow(deprecated)]
1941    async fn test_in_memory_row_group_sparse() {
1942        let testdata = arrow::util::test_util::parquet_test_data();
1943        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1944        let data = Bytes::from(std::fs::read(path).unwrap());
1945
1946        let metadata = ParquetMetaDataReader::new()
1947            .with_page_indexes(true)
1948            .parse_and_finish(&data)
1949            .unwrap();
1950
1951        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1952
1953        let mut metadata_builder = metadata.into_builder();
1954        let mut row_groups = metadata_builder.take_row_groups();
1955        row_groups.truncate(1);
1956        let row_group_meta = row_groups.pop().unwrap();
1957
1958        let metadata = metadata_builder
1959            .add_row_group(row_group_meta)
1960            .set_column_index(None)
1961            .set_offset_index(Some(vec![offset_index.clone()]))
1962            .build();
1963
1964        let metadata = Arc::new(metadata);
1965
1966        let num_rows = metadata.row_group(0).num_rows();
1967
1968        assert_eq!(metadata.num_row_groups(), 1);
1969
1970        let async_reader = TestReader::new(data.clone());
1971
1972        let requests = async_reader.requests.clone();
1973        let (_, fields) = parquet_to_arrow_schema_and_fields(
1974            metadata.file_metadata().schema_descr(),
1975            ProjectionMask::all(),
1976            None,
1977        )
1978        .unwrap();
1979
1980        let _schema_desc = metadata.file_metadata().schema_descr();
1981
1982        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1983
1984        let reader_factory = ReaderFactory {
1985            metadata,
1986            fields: fields.map(Arc::new),
1987            input: async_reader,
1988            filter: None,
1989            limit: None,
1990            offset: None,
1991            metrics: ArrowReaderMetrics::disabled(),
1992            max_predicate_cache_size: 0,
1993        };
1994
1995        let mut skip = true;
1996        let mut pages = offset_index[0].page_locations.iter().peekable();
1997
1998        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1999        let mut selectors = vec![];
2000        let mut expected_page_requests: Vec<Range<usize>> = vec![];
2001        while let Some(page) = pages.next() {
2002            let num_rows = if let Some(next_page) = pages.peek() {
2003                next_page.first_row_index - page.first_row_index
2004            } else {
2005                num_rows - page.first_row_index
2006            };
2007
2008            if skip {
2009                selectors.push(RowSelector::skip(num_rows as usize));
2010            } else {
2011                selectors.push(RowSelector::select(num_rows as usize));
2012                let start = page.offset as usize;
2013                let end = start + page.compressed_page_size as usize;
2014                expected_page_requests.push(start..end);
2015            }
2016            skip = !skip;
2017        }
2018
2019        let selection = RowSelection::from(selectors);
2020
2021        let (_factory, _reader) = reader_factory
2022            .read_row_group(0, Some(selection), projection.clone(), 48)
2023            .await
2024            .expect("reading row group");
2025
2026        let requests = requests.lock().unwrap();
2027
2028        assert_eq!(&requests[..], &expected_page_requests)
2029    }
2030
2031    #[tokio::test]
2032    async fn test_batch_size_overallocate() {
2033        let testdata = arrow::util::test_util::parquet_test_data();
2034        // `alltypes_plain.parquet` only have 8 rows
2035        let path = format!("{testdata}/alltypes_plain.parquet");
2036        let data = Bytes::from(std::fs::read(path).unwrap());
2037
2038        let async_reader = TestReader::new(data.clone());
2039
2040        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2041            .await
2042            .unwrap();
2043
2044        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2045
2046        let stream = builder
2047            .with_projection(ProjectionMask::all())
2048            .with_batch_size(1024)
2049            .build()
2050            .unwrap();
2051        assert_ne!(1024, file_rows);
2052        assert_eq!(stream.batch_size, file_rows);
2053    }
2054
2055    #[tokio::test]
2056    async fn test_get_row_group_column_bloom_filter_without_length() {
2057        let testdata = arrow::util::test_util::parquet_test_data();
2058        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2059        let data = Bytes::from(std::fs::read(path).unwrap());
2060        test_get_row_group_column_bloom_filter(data, false).await;
2061    }
2062
2063    #[tokio::test]
2064    async fn test_parquet_record_batch_stream_schema() {
2065        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2066            schema.flattened_fields().iter().map(|f| f.name()).collect()
2067        }
2068
2069        // ParquetRecordBatchReaderBuilder::schema differs from
2070        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
2071        // schema contents (in terms of custom metadata attached to schema, and fields
2072        // returned). Test to ensure this remains consistent behaviour.
2073        //
2074        // Ensure same for asynchronous versions of the above.
2075
2076        // Prep data, for a schema with nested fields, with custom metadata
2077        let mut metadata = HashMap::with_capacity(1);
2078        metadata.insert("key".to_string(), "value".to_string());
2079
2080        let nested_struct_array = StructArray::from(vec![
2081            (
2082                Arc::new(Field::new("d", DataType::Utf8, true)),
2083                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2084            ),
2085            (
2086                Arc::new(Field::new("e", DataType::Utf8, true)),
2087                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2088            ),
2089        ]);
2090        let struct_array = StructArray::from(vec![
2091            (
2092                Arc::new(Field::new("a", DataType::Int32, true)),
2093                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2094            ),
2095            (
2096                Arc::new(Field::new("b", DataType::UInt64, true)),
2097                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2098            ),
2099            (
2100                Arc::new(Field::new(
2101                    "c",
2102                    nested_struct_array.data_type().clone(),
2103                    true,
2104                )),
2105                Arc::new(nested_struct_array) as ArrayRef,
2106            ),
2107        ]);
2108
2109        let schema =
2110            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2111        let record_batch = RecordBatch::from(struct_array)
2112            .with_schema(schema.clone())
2113            .unwrap();
2114
2115        // Write parquet with custom metadata in schema
2116        let mut file = tempfile().unwrap();
2117        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2118        writer.write(&record_batch).unwrap();
2119        writer.close().unwrap();
2120
2121        let all_fields = ["a", "b", "c", "d", "e"];
2122        // (leaf indices in mask, expected names in output schema all fields)
2123        let projections = [
2124            (vec![], vec![]),
2125            (vec![0], vec!["a"]),
2126            (vec![0, 1], vec!["a", "b"]),
2127            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2128            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2129        ];
2130
2131        // Ensure we're consistent for each of these projections
2132        for (indices, expected_projected_names) in projections {
2133            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2134                // Builder schema should preserve all fields and metadata
2135                assert_eq!(get_all_field_names(&builder), all_fields);
2136                assert_eq!(builder.metadata, metadata);
2137                // Reader & batch schema should show only projected fields, and no metadata
2138                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2139                assert_eq!(reader.metadata, HashMap::default());
2140                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2141                assert_eq!(batch.metadata, HashMap::default());
2142            };
2143
2144            let builder =
2145                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2146            let sync_builder_schema = builder.schema().clone();
2147            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2148            let mut reader = builder.with_projection(mask).build().unwrap();
2149            let sync_reader_schema = reader.schema();
2150            let batch = reader.next().unwrap().unwrap();
2151            let sync_batch_schema = batch.schema();
2152            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2153
2154            // asynchronous should be same
2155            let file = tokio::fs::File::from(file.try_clone().unwrap());
2156            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2157            let async_builder_schema = builder.schema().clone();
2158            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2159            let mut reader = builder.with_projection(mask).build().unwrap();
2160            let async_reader_schema = reader.schema().clone();
2161            let batch = reader.next().await.unwrap().unwrap();
2162            let async_batch_schema = batch.schema();
2163            assert_schemas(
2164                async_builder_schema,
2165                async_reader_schema,
2166                async_batch_schema,
2167            );
2168        }
2169    }
2170
2171    #[tokio::test]
2172    async fn test_get_row_group_column_bloom_filter_with_length() {
2173        // convert to new parquet file with bloom_filter_length
2174        let testdata = arrow::util::test_util::parquet_test_data();
2175        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2176        let data = Bytes::from(std::fs::read(path).unwrap());
2177        let async_reader = TestReader::new(data.clone());
2178        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2179            .await
2180            .unwrap();
2181        let schema = builder.schema().clone();
2182        let stream = builder.build().unwrap();
2183        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2184
2185        let mut parquet_data = Vec::new();
2186        let props = WriterProperties::builder()
2187            .set_bloom_filter_enabled(true)
2188            .build();
2189        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2190        for batch in batches {
2191            writer.write(&batch).unwrap();
2192        }
2193        writer.close().unwrap();
2194
2195        // test the new parquet file
2196        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2197    }
2198
2199    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2200        let async_reader = TestReader::new(data.clone());
2201
2202        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2203            .await
2204            .unwrap();
2205
2206        let metadata = builder.metadata();
2207        assert_eq!(metadata.num_row_groups(), 1);
2208        let row_group = metadata.row_group(0);
2209        let column = row_group.column(0);
2210        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2211
2212        let sbbf = builder
2213            .get_row_group_column_bloom_filter(0, 0)
2214            .await
2215            .unwrap()
2216            .unwrap();
2217        assert!(sbbf.check(&"Hello"));
2218        assert!(!sbbf.check(&"Hello_Not_Exists"));
2219    }
2220
2221    #[tokio::test]
2222    async fn test_nested_skip() {
2223        let schema = Arc::new(Schema::new(vec![
2224            Field::new("col_1", DataType::UInt64, false),
2225            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2226        ]));
2227
2228        // Default writer properties
2229        let props = WriterProperties::builder()
2230            .set_data_page_row_count_limit(256)
2231            .set_write_batch_size(256)
2232            .set_max_row_group_size(1024);
2233
2234        // Write data
2235        let mut file = tempfile().unwrap();
2236        let mut writer =
2237            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2238
2239        let mut builder = ListBuilder::new(StringBuilder::new());
2240        for id in 0..1024 {
2241            match id % 3 {
2242                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2243                1 => builder.append_value([Some(format!("id_{id}"))]),
2244                _ => builder.append_null(),
2245            }
2246        }
2247        let refs = vec![
2248            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2249            Arc::new(builder.finish()) as ArrayRef,
2250        ];
2251
2252        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2253        writer.write(&batch).unwrap();
2254        writer.close().unwrap();
2255
2256        let selections = [
2257            RowSelection::from(vec![
2258                RowSelector::skip(313),
2259                RowSelector::select(1),
2260                RowSelector::skip(709),
2261                RowSelector::select(1),
2262            ]),
2263            RowSelection::from(vec![
2264                RowSelector::skip(255),
2265                RowSelector::select(1),
2266                RowSelector::skip(767),
2267                RowSelector::select(1),
2268            ]),
2269            RowSelection::from(vec![
2270                RowSelector::select(255),
2271                RowSelector::skip(1),
2272                RowSelector::select(767),
2273                RowSelector::skip(1),
2274            ]),
2275            RowSelection::from(vec![
2276                RowSelector::skip(254),
2277                RowSelector::select(1),
2278                RowSelector::select(1),
2279                RowSelector::skip(767),
2280                RowSelector::select(1),
2281            ]),
2282        ];
2283
2284        for selection in selections {
2285            let expected = selection.row_count();
2286            // Read data
2287            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2288                tokio::fs::File::from_std(file.try_clone().unwrap()),
2289                ArrowReaderOptions::new().with_page_index(true),
2290            )
2291            .await
2292            .unwrap();
2293
2294            reader = reader.with_row_selection(selection);
2295
2296            let mut stream = reader.build().unwrap();
2297
2298            let mut total_rows = 0;
2299            while let Some(rb) = stream.next().await {
2300                let rb = rb.unwrap();
2301                total_rows += rb.num_rows();
2302            }
2303            assert_eq!(total_rows, expected);
2304        }
2305    }
2306
2307    #[tokio::test]
2308    async fn test_row_filter_nested() {
2309        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2310        let b = StructArray::from(vec![
2311            (
2312                Arc::new(Field::new("aa", DataType::Utf8, true)),
2313                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2314            ),
2315            (
2316                Arc::new(Field::new("bb", DataType::Utf8, true)),
2317                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2318            ),
2319        ]);
2320        let c = Int32Array::from_iter(0..6);
2321        let data = RecordBatch::try_from_iter([
2322            ("a", Arc::new(a) as ArrayRef),
2323            ("b", Arc::new(b) as ArrayRef),
2324            ("c", Arc::new(c) as ArrayRef),
2325        ])
2326        .unwrap();
2327
2328        let mut buf = Vec::with_capacity(1024);
2329        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2330        writer.write(&data).unwrap();
2331        writer.close().unwrap();
2332
2333        let data: Bytes = buf.into();
2334        let metadata = ParquetMetaDataReader::new()
2335            .parse_and_finish(&data)
2336            .unwrap();
2337        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2338
2339        let test = TestReader::new(data);
2340        let requests = test.requests.clone();
2341
2342        let a_scalar = StringArray::from_iter_values(["b"]);
2343        let a_filter = ArrowPredicateFn::new(
2344            ProjectionMask::leaves(&parquet_schema, vec![0]),
2345            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2346        );
2347
2348        let b_scalar = StringArray::from_iter_values(["4"]);
2349        let b_filter = ArrowPredicateFn::new(
2350            ProjectionMask::leaves(&parquet_schema, vec![2]),
2351            move |batch| {
2352                // Filter on the second element of the struct.
2353                let struct_array = batch
2354                    .column(0)
2355                    .as_any()
2356                    .downcast_ref::<StructArray>()
2357                    .unwrap();
2358                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2359            },
2360        );
2361
2362        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2363
2364        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2365        let stream = ParquetRecordBatchStreamBuilder::new(test)
2366            .await
2367            .unwrap()
2368            .with_projection(mask.clone())
2369            .with_batch_size(1024)
2370            .with_row_filter(filter)
2371            .build()
2372            .unwrap();
2373
2374        let batches: Vec<_> = stream.try_collect().await.unwrap();
2375        assert_eq!(batches.len(), 1);
2376
2377        let batch = &batches[0];
2378        assert_eq!(batch.num_rows(), 1);
2379        assert_eq!(batch.num_columns(), 2);
2380
2381        let col = batch.column(0);
2382        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2383        assert_eq!(val, "b");
2384
2385        let col = batch.column(1);
2386        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2387        assert_eq!(val, 3);
2388
2389        // Should only have made 3 requests
2390        // * First request fetches data for evaluating the first predicate
2391        // * Second request fetches data for evaluating the second predicate
2392        // * Third request fetches data for evaluating the projection
2393        assert_eq!(requests.lock().unwrap().len(), 3);
2394    }
2395
2396    #[tokio::test]
2397    async fn test_cache_projection_excludes_nested_columns() {
2398        use arrow_array::{ArrayRef, StringArray};
2399
2400        // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
2401        let a = StringArray::from_iter_values(["r1", "r2"]);
2402        let b = StructArray::from(vec![
2403            (
2404                Arc::new(Field::new("aa", DataType::Utf8, true)),
2405                Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2406            ),
2407            (
2408                Arc::new(Field::new("bb", DataType::Utf8, true)),
2409                Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2410            ),
2411        ]);
2412
2413        let schema = Arc::new(Schema::new(vec![
2414            Field::new("a", DataType::Utf8, true),
2415            Field::new("b", b.data_type().clone(), true),
2416        ]));
2417
2418        let mut buf = Vec::new();
2419        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2420        let batch = RecordBatch::try_from_iter([
2421            ("a", Arc::new(a) as ArrayRef),
2422            ("b", Arc::new(b) as ArrayRef),
2423        ])
2424        .unwrap();
2425        writer.write(&batch).unwrap();
2426        writer.close().unwrap();
2427
2428        // Load Parquet metadata
2429        let data: Bytes = buf.into();
2430        let metadata = ParquetMetaDataReader::new()
2431            .parse_and_finish(&data)
2432            .unwrap();
2433        let metadata = Arc::new(metadata);
2434
2435        // Build a RowFilter whose predicate projects a leaf under the nested root `b`
2436        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
2437        let parquet_schema = metadata.file_metadata().schema_descr();
2438        let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2439
2440        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2441            Ok(arrow_array::BooleanArray::from(vec![
2442                true;
2443                batch.num_rows()
2444            ]))
2445        });
2446        let filter = RowFilter::new(vec![Box::new(always_true)]);
2447
2448        // Construct a ReaderFactory and compute cache projection
2449        let reader_factory = ReaderFactory {
2450            metadata: Arc::clone(&metadata),
2451            fields: None,
2452            input: TestReader::new(data),
2453            filter: Some(filter),
2454            limit: None,
2455            offset: None,
2456            metrics: ArrowReaderMetrics::disabled(),
2457            max_predicate_cache_size: 0,
2458        };
2459
2460        // Provide an output projection that also selects the same nested leaf
2461        let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2462
2463        // Expect None since nested columns should be excluded from cache projection
2464        assert!(cache_projection.is_none());
2465    }
2466
2467    #[tokio::test]
2468    #[allow(deprecated)]
2469    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2470        use tokio::fs::File;
2471        let testdata = arrow::util::test_util::parquet_test_data();
2472        let path = format!("{testdata}/alltypes_plain.parquet");
2473        let mut file = File::open(&path).await.unwrap();
2474        let file_size = file.metadata().await.unwrap().len();
2475        let mut metadata = ParquetMetaDataReader::new()
2476            .with_page_indexes(true)
2477            .load_and_finish(&mut file, file_size)
2478            .await
2479            .unwrap();
2480
2481        metadata.set_offset_index(Some(vec![]));
2482        let options = ArrowReaderOptions::new().with_page_index(true);
2483        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2484        let reader =
2485            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2486                .build()
2487                .unwrap();
2488
2489        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2490        assert_eq!(result.len(), 1);
2491    }
2492
2493    #[tokio::test]
2494    #[allow(deprecated)]
2495    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2496        use tokio::fs::File;
2497        let testdata = arrow::util::test_util::parquet_test_data();
2498        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2499        let mut file = File::open(&path).await.unwrap();
2500        let file_size = file.metadata().await.unwrap().len();
2501        let metadata = ParquetMetaDataReader::new()
2502            .with_page_indexes(true)
2503            .load_and_finish(&mut file, file_size)
2504            .await
2505            .unwrap();
2506
2507        let options = ArrowReaderOptions::new().with_page_index(true);
2508        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2509        let reader =
2510            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2511                .build()
2512                .unwrap();
2513
2514        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2515        assert_eq!(result.len(), 8);
2516    }
2517
2518    #[tokio::test]
2519    #[allow(deprecated)]
2520    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2521        use tempfile::TempDir;
2522        use tokio::fs::File;
2523        fn write_metadata_to_local_file(
2524            metadata: ParquetMetaData,
2525            file: impl AsRef<std::path::Path>,
2526        ) {
2527            use crate::file::metadata::ParquetMetaDataWriter;
2528            use std::fs::File;
2529            let file = File::create(file).unwrap();
2530            ParquetMetaDataWriter::new(file, &metadata)
2531                .finish()
2532                .unwrap()
2533        }
2534
2535        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2536            use std::fs::File;
2537            let file = File::open(file).unwrap();
2538            ParquetMetaDataReader::new()
2539                .with_page_indexes(true)
2540                .parse_and_finish(&file)
2541                .unwrap()
2542        }
2543
2544        let testdata = arrow::util::test_util::parquet_test_data();
2545        let path = format!("{testdata}/alltypes_plain.parquet");
2546        let mut file = File::open(&path).await.unwrap();
2547        let file_size = file.metadata().await.unwrap().len();
2548        let metadata = ParquetMetaDataReader::new()
2549            .with_page_indexes(true)
2550            .load_and_finish(&mut file, file_size)
2551            .await
2552            .unwrap();
2553
2554        let tempdir = TempDir::new().unwrap();
2555        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2556        write_metadata_to_local_file(metadata, &metadata_path);
2557        let metadata = read_metadata_from_local_file(&metadata_path);
2558
2559        let options = ArrowReaderOptions::new().with_page_index(true);
2560        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2561        let reader =
2562            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2563                .build()
2564                .unwrap();
2565
2566        // Panics here
2567        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2568        assert_eq!(result.len(), 1);
2569    }
2570
2571    #[tokio::test]
2572    async fn test_cached_array_reader_sparse_offset_error() {
2573        use futures::TryStreamExt;
2574
2575        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2576        use arrow_array::{BooleanArray, RecordBatch};
2577
2578        let testdata = arrow::util::test_util::parquet_test_data();
2579        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2580        let data = Bytes::from(std::fs::read(path).unwrap());
2581
2582        let async_reader = TestReader::new(data);
2583
2584        // Enable page index so the fetch logic loads only required pages
2585        let options = ArrowReaderOptions::new().with_page_index(true);
2586        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2587            .await
2588            .unwrap();
2589
2590        // Skip the first 22 rows (entire first Parquet page) and then select the
2591        // next 3 rows (22, 23, 24). This means the fetch step will not include
2592        // the first page starting at file offset 0.
2593        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2594
2595        // Trivial predicate on column 0 that always returns `true`. Using the
2596        // same column in both predicate and projection activates the caching
2597        // layer (Producer/Consumer pattern).
2598        let parquet_schema = builder.parquet_schema();
2599        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2600        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2601            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2602        });
2603        let filter = RowFilter::new(vec![Box::new(always_true)]);
2604
2605        // Build the stream with batch size 8 so the cache reads whole batches
2606        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2607        let stream = builder
2608            .with_batch_size(8)
2609            .with_projection(proj)
2610            .with_row_selection(selection)
2611            .with_row_filter(filter)
2612            .build()
2613            .unwrap();
2614
2615        // Collecting the stream should fail with the sparse column chunk offset
2616        // error we want to reproduce.
2617        let _result: Vec<_> = stream.try_collect().await.unwrap();
2618    }
2619
2620    #[tokio::test]
2621    async fn test_predicate_cache_disabled() {
2622        let k = Int32Array::from_iter_values(0..10);
2623        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2624
2625        let mut buf = Vec::new();
2626        // both the page row limit and batch size are set to 1 to create one page per row
2627        let props = WriterProperties::builder()
2628            .set_data_page_row_count_limit(1)
2629            .set_write_batch_size(1)
2630            .set_max_row_group_size(10)
2631            .set_write_page_header_statistics(true)
2632            .build();
2633        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2634        writer.write(&data).unwrap();
2635        writer.close().unwrap();
2636
2637        let data = Bytes::from(buf);
2638        let metadata = ParquetMetaDataReader::new()
2639            .with_page_index_policy(PageIndexPolicy::Required)
2640            .parse_and_finish(&data)
2641            .unwrap();
2642        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2643
2644        // the filter is not clone-able, so we use a lambda to simplify
2645        let build_filter = || {
2646            let scalar = Int32Array::from_iter_values([5]);
2647            let predicate = ArrowPredicateFn::new(
2648                ProjectionMask::leaves(&parquet_schema, vec![0]),
2649                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2650            );
2651            RowFilter::new(vec![Box::new(predicate)])
2652        };
2653
2654        // select only one of the pages
2655        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2656
2657        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2658        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2659
2660        // using the predicate cache (default)
2661        let reader_with_cache = TestReader::new(data.clone());
2662        let requests_with_cache = reader_with_cache.requests.clone();
2663        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2664            reader_with_cache,
2665            reader_metadata.clone(),
2666        )
2667        .with_batch_size(1000)
2668        .with_row_selection(selection.clone())
2669        .with_row_filter(build_filter())
2670        .build()
2671        .unwrap();
2672        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2673
2674        // disabling the predicate cache
2675        let reader_without_cache = TestReader::new(data);
2676        let requests_without_cache = reader_without_cache.requests.clone();
2677        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2678            reader_without_cache,
2679            reader_metadata,
2680        )
2681        .with_batch_size(1000)
2682        .with_row_selection(selection)
2683        .with_row_filter(build_filter())
2684        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
2685        .build()
2686        .unwrap();
2687        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2688
2689        assert_eq!(batches_with_cache, batches_without_cache);
2690
2691        let requests_with_cache = requests_with_cache.lock().unwrap();
2692        let requests_without_cache = requests_without_cache.lock().unwrap();
2693
2694        // less requests will be made without the predicate cache
2695        assert_eq!(requests_with_cache.len(), 11);
2696        assert_eq!(requests_without_cache.len(), 2);
2697
2698        // less bytes will be retrieved without the predicate cache
2699        assert_eq!(
2700            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2701            433
2702        );
2703        assert_eq!(
2704            requests_without_cache
2705                .iter()
2706                .map(|r| r.len())
2707                .sum::<usize>(),
2708            92
2709        );
2710    }
2711}