parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42    ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46    RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
73///
74/// Notes:
75///
76/// 1. There is a default implementation for types that implement [`AsyncRead`]
77///    and [`AsyncSeek`], for example [`tokio::fs::File`].
78///
79/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
80///    is enabled, implements this interface for [`ObjectStore`].
81///
82/// [`ObjectStore`]: object_store::ObjectStore
83///
84/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
85pub trait AsyncFileReader: Send {
86    /// Retrieve the bytes in `range`
87    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
90    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91        async move {
92            let mut result = Vec::with_capacity(ranges.len());
93
94            for range in ranges.into_iter() {
95                let data = self.get_bytes(range).await?;
96                result.push(data);
97            }
98
99            Ok(result)
100        }
101        .boxed()
102    }
103
104    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
105    ///
106    /// This is an asynchronous operation as it may involve reading the file
107    /// footer and potentially other metadata from disk or a remote source.
108    ///
109    /// Reading data from Parquet requires the metadata to understand the
110    /// schema, row groups, and location of pages within the file. This metadata
111    /// is stored primarily in the footer of the Parquet file, and can be read using
112    /// [`ParquetMetaDataReader`].
113    ///
114    /// However, implementations can significantly speed up reading Parquet by
115    /// supplying cached metadata or pre-fetched metadata via this API.
116    ///
117    /// # Parameters
118    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
119    ///   and other options that affect how the metadata is read.
120    fn get_metadata<'a>(
121        &'a mut self,
122        options: Option<&'a ArrowReaderOptions>,
123    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
127impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129        self.as_mut().get_bytes(range)
130    }
131
132    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133        self.as_mut().get_byte_ranges(ranges)
134    }
135
136    fn get_metadata<'a>(
137        &'a mut self,
138        options: Option<&'a ArrowReaderOptions>,
139    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140        self.as_mut().get_metadata(options)
141    }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146        async move {
147            self.seek(SeekFrom::End(-(suffix as i64))).await?;
148            let mut buf = Vec::with_capacity(suffix);
149            self.take(suffix as _).read_to_end(&mut buf).await?;
150            Ok(buf.into())
151        }
152        .boxed()
153    }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158        async move {
159            self.seek(SeekFrom::Start(range.start)).await?;
160
161            let to_read = range.end - range.start;
162            let mut buffer = Vec::with_capacity(to_read.try_into()?);
163            let read = self.take(to_read).read_to_end(&mut buffer).await?;
164            if read as u64 != to_read {
165                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166            }
167
168            Ok(buffer.into())
169        }
170        .boxed()
171    }
172
173    fn get_metadata<'a>(
174        &'a mut self,
175        options: Option<&'a ArrowReaderOptions>,
176    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177        async move {
178            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180            );
181
182            #[cfg(feature = "encryption")]
183            let metadata_reader = metadata_reader.with_decryption_properties(
184                options.and_then(|o| o.file_decryption_properties.as_ref()),
185            );
186
187            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188            Ok(Arc::new(parquet_metadata))
189        }
190        .boxed()
191    }
192}
193
194impl ArrowReaderMetadata {
195    /// Returns a new [`ArrowReaderMetadata`] for this builder
196    ///
197    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
198    pub async fn load_async<T: AsyncFileReader>(
199        input: &mut T,
200        options: ArrowReaderOptions,
201    ) -> Result<Self> {
202        let metadata = input.get_metadata(Some(&options)).await?;
203        Self::try_new(metadata, options)
204    }
205}
206
207#[doc(hidden)]
208/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
209///
210/// Allows sharing the same builder for both the sync and async versions, whilst also not
211/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
212pub struct AsyncReader<T>(T);
213
214/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
215///
216/// This can be used to decode a Parquet file in streaming fashion (without
217/// downloading the whole file at once) from a remote source, such as an object store.
218///
219/// This builder handles reading the parquet file metadata, allowing consumers
220/// to use this information to select what specific columns, row groups, etc.
221/// they wish to be read by the resulting stream.
222///
223/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
224///
225/// See [`ArrowReaderBuilder`] for additional member functions
226pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
230    /// specified source.
231    ///
232    /// # Example
233    /// ```
234    /// # #[tokio::main(flavor="current_thread")]
235    /// # async fn main() {
236    /// #
237    /// # use arrow_array::RecordBatch;
238    /// # use arrow::util::pretty::pretty_format_batches;
239    /// # use futures::TryStreamExt;
240    /// #
241    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
242    /// #
243    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
244    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
245    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
246    /// #     assert_eq!(
247    /// #          &actual_lines, expected_lines,
248    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
249    /// #          expected_lines, actual_lines
250    /// #      );
251    /// #  }
252    /// #
253    /// # let testdata = arrow::util::test_util::parquet_test_data();
254    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
255    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
256    /// // another async I/O reader such as a reader from an object store.
257    /// let file = tokio::fs::File::open(path).await.unwrap();
258    ///
259    /// // Configure options for reading from the async source
260    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
261    ///     .await
262    ///     .unwrap();
263    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
264    /// // a stream that can be used to incrementally read the data in batches
265    /// let stream = builder.build().unwrap();
266    /// // In this example, we collect the stream into a Vec<RecordBatch>
267    /// // but real applications would likely process the batches as they are read
268    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
269    /// // Demonstrate the results are as expected
270    /// assert_batches_eq(
271    ///     &results,
272    ///     &[
273    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
274    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
275    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
276    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
277    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
278    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
279    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
280    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
281    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
282    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
283    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
284    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
285    ///      ],
286    ///  );
287    /// # }
288    /// ```
289    ///
290    /// # Example configuring options and reading metadata
291    ///
292    /// There are many options that control the behavior of the reader, such as
293    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
294    ///
295    /// ```
296    /// # #[tokio::main(flavor="current_thread")]
297    /// # async fn main() {
298    /// #
299    /// # use arrow_array::RecordBatch;
300    /// # use arrow::util::pretty::pretty_format_batches;
301    /// # use futures::TryStreamExt;
302    /// #
303    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
304    /// #
305    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
306    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
307    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
308    /// #     assert_eq!(
309    /// #          &actual_lines, expected_lines,
310    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
311    /// #          expected_lines, actual_lines
312    /// #      );
313    /// #  }
314    /// #
315    /// # let testdata = arrow::util::test_util::parquet_test_data();
316    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
317    /// // As before, use tokio::fs::File to read data using an async I/O.
318    /// let file = tokio::fs::File::open(path).await.unwrap();
319    ///
320    /// // Configure options for reading from the async source, in this case we set the batch size
321    /// // to 3 which produces 3 rows at a time.
322    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
323    ///     .await
324    ///     .unwrap()
325    ///     .with_batch_size(3);
326    ///
327    /// // We can also read the metadata to inspect the schema and other metadata
328    /// // before actually reading the data
329    /// let file_metadata = builder.metadata().file_metadata();
330    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
331    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
332    ///
333    /// let stream = builder.with_projection(mask).build().unwrap();
334    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
335    /// // Print out the results
336    /// assert_batches_eq(
337    ///     &results,
338    ///     &[
339    ///         "+----------+-------------+-----------+",
340    ///         "| bool_col | tinyint_col | float_col |",
341    ///         "+----------+-------------+-----------+",
342    ///         "| true     | 0           | 0.0       |",
343    ///         "| false    | 1           | 1.1       |",
344    ///         "| true     | 0           | 0.0       |",
345    ///         "| false    | 1           | 1.1       |",
346    ///         "| true     | 0           | 0.0       |",
347    ///         "| false    | 1           | 1.1       |",
348    ///         "| true     | 0           | 0.0       |",
349    ///         "| false    | 1           | 1.1       |",
350    ///         "+----------+-------------+-----------+",
351    ///      ],
352    ///  );
353    ///
354    /// // The results has 8 rows, so since we set the batch size to 3, we expect
355    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
356    /// assert_eq!(results.len(), 3);
357    /// # }
358    /// ```
359    pub async fn new(input: T) -> Result<Self> {
360        Self::new_with_options(input, Default::default()).await
361    }
362
363    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
364    /// and [`ArrowReaderOptions`].
365    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367        Ok(Self::new_with_metadata(input, metadata))
368    }
369
370    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
371    ///
372    /// This allows loading metadata once and using it to create multiple builders with
373    /// potentially different settings, that can be read in parallel.
374    ///
375    /// # Example of reading from multiple streams in parallel
376    ///
377    /// ```
378    /// # use std::fs::metadata;
379    /// # use std::sync::Arc;
380    /// # use bytes::Bytes;
381    /// # use arrow_array::{Int32Array, RecordBatch};
382    /// # use arrow_schema::{DataType, Field, Schema};
383    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
384    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
385    /// # use tempfile::tempfile;
386    /// # use futures::StreamExt;
387    /// # #[tokio::main(flavor="current_thread")]
388    /// # async fn main() {
389    /// #
390    /// # let mut file = tempfile().unwrap();
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// // open file with parquet data
397    /// let mut file = tokio::fs::File::from_std(file);
398    /// // load metadata once
399    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
400    /// // create two readers, a and b, from the same underlying file
401    /// // without reading the metadata again
402    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
403    ///     file.try_clone().await.unwrap(),
404    ///     meta.clone()
405    /// ).build().unwrap();
406    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
407    ///
408    /// // Can read batches from both readers in parallel
409    /// assert_eq!(
410    ///   a.next().await.unwrap().unwrap(),
411    ///   b.next().await.unwrap().unwrap(),
412    /// );
413    /// # }
414    /// ```
415    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416        Self::new_builder(AsyncReader(input), metadata)
417    }
418
419    /// Read bloom filter for a column in a row group
420    ///
421    /// Returns `None` if the column does not have a bloom filter
422    ///
423    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
424    pub async fn get_row_group_column_bloom_filter(
425        &mut self,
426        row_group_idx: usize,
427        column_idx: usize,
428    ) -> Result<Option<Sbbf>> {
429        let metadata = self.metadata.row_group(row_group_idx);
430        let column_metadata = metadata.column(column_idx);
431
432        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433            offset
434                .try_into()
435                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436        } else {
437            return Ok(None);
438        };
439
440        let buffer = match column_metadata.bloom_filter_length() {
441            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442            None => self
443                .input
444                .0
445                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446        }
447        .await?;
448
449        let (header, bitset_offset) =
450            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452        match header.algorithm {
453            BloomFilterAlgorithm::BLOCK(_) => {
454                // this match exists to future proof the singleton algorithm enum
455            }
456        }
457        match header.compression {
458            BloomFilterCompression::UNCOMPRESSED(_) => {
459                // this match exists to future proof the singleton compression enum
460            }
461        }
462        match header.hash {
463            BloomFilterHash::XXHASH(_) => {
464                // this match exists to future proof the singleton hash enum
465            }
466        }
467
468        let bitset = match column_metadata.bloom_filter_length() {
469            Some(_) => buffer.slice(
470                (TryInto::<usize>::try_into(bitset_offset).unwrap()
471                    - TryInto::<usize>::try_into(offset).unwrap())..,
472            ),
473            None => {
474                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475                    ParquetError::General("Bloom filter length is invalid".to_string())
476                })?;
477                self.input
478                    .0
479                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
480                    .await?
481            }
482        };
483        Ok(Some(Sbbf::new(&bitset)))
484    }
485
486    /// Build a new [`ParquetRecordBatchStream`]
487    ///
488    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
489    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490        let num_row_groups = self.metadata.row_groups().len();
491
492        let row_groups = match self.row_groups {
493            Some(row_groups) => {
494                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495                    return Err(general_err!(
496                        "row group {} out of bounds 0..{}",
497                        col,
498                        num_row_groups
499                    ));
500                }
501                row_groups.into()
502            }
503            None => (0..self.metadata.row_groups().len()).collect(),
504        };
505
506        // Try to avoid allocate large buffer
507        let batch_size = self
508            .batch_size
509            .min(self.metadata.file_metadata().num_rows() as usize);
510        let reader_factory = ReaderFactory {
511            input: self.input.0,
512            filter: self.filter,
513            metadata: self.metadata.clone(),
514            fields: self.fields,
515            limit: self.limit,
516            offset: self.offset,
517            metrics: self.metrics,
518            max_predicate_cache_size: self.max_predicate_cache_size,
519        };
520
521        // Ensure schema of ParquetRecordBatchStream respects projection, and does
522        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
523        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524            Some(DataType::Struct(fields)) => {
525                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526            }
527            None => Fields::empty(),
528            _ => unreachable!("Must be Struct for root type"),
529        };
530        let schema = Arc::new(Schema::new(projected_fields));
531
532        Ok(ParquetRecordBatchStream {
533            metadata: self.metadata,
534            batch_size,
535            row_groups,
536            projection: self.projection,
537            selection: self.selection,
538            schema,
539            reader_factory: Some(reader_factory),
540            state: StreamState::Init,
541        })
542    }
543}
544
545/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
546///
547/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
548/// offset), returns `None` for the reader.
549type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
552/// [`ParquetRecordBatchReader`]
553struct ReaderFactory<T> {
554    metadata: Arc<ParquetMetaData>,
555
556    /// Top level parquet schema
557    fields: Option<Arc<ParquetField>>,
558
559    input: T,
560
561    /// Optional filter
562    filter: Option<RowFilter>,
563
564    /// Limit to apply to remaining row groups.  
565    limit: Option<usize>,
566
567    /// Offset to apply to the next
568    offset: Option<usize>,
569
570    /// Metrics
571    metrics: ArrowReaderMetrics,
572
573    /// Maximum size of the predicate cache
574    max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579    T: AsyncFileReader + Send,
580{
581    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
582    ///
583    /// Updates the `limit` and `offset` of the reader factory
584    ///
585    /// Note: this captures self so that the resulting future has a static lifetime
586    async fn read_row_group(
587        mut self,
588        row_group_idx: usize,
589        selection: Option<RowSelection>,
590        projection: ProjectionMask,
591        batch_size: usize,
592    ) -> ReadResult<T> {
593        // TODO: calling build_array multiple times is wasteful
594
595        let meta = self.metadata.row_group(row_group_idx);
596        let offset_index = self
597            .metadata
598            .offset_index()
599            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
600            .filter(|index| !index.is_empty())
601            .map(|x| x[row_group_idx].as_slice());
602
603        // Reuse columns that are selected and used by the filters
604        let cache_projection = match self.compute_cache_projection(&projection) {
605            Some(projection) => projection,
606            None => ProjectionMask::none(meta.columns().len()),
607        };
608        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609            batch_size,
610            self.max_predicate_cache_size,
611        )));
612
613        let mut row_group = InMemoryRowGroup {
614            // schema: meta.schema_descr_ptr(),
615            row_count: meta.num_rows() as usize,
616            column_chunks: vec![None; meta.columns().len()],
617            offset_index,
618            row_group_idx,
619            metadata: self.metadata.as_ref(),
620        };
621
622        let cache_options_builder =
623            CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone());
624
625        let filter = self.filter.as_mut();
626        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
627
628        // Update selection based on any filters
629        if let Some(filter) = filter {
630            let cache_options = cache_options_builder.clone().producer();
631
632            for predicate in filter.predicates.iter_mut() {
633                if !plan_builder.selects_any() {
634                    return Ok((self, None)); // ruled out entire row group
635                }
636
637                // (pre) Fetch only the columns that are selected by the predicate
638                let selection = plan_builder.selection();
639                // Fetch predicate columns; expand selection only for cached predicate columns
640                let cache_mask = Some(&cache_projection);
641                row_group
642                    .fetch(
643                        &mut self.input,
644                        predicate.projection(),
645                        selection,
646                        batch_size,
647                        cache_mask,
648                    )
649                    .await?;
650
651                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
652                    .with_cache_options(Some(&cache_options))
653                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
654
655                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
656            }
657        }
658
659        // Compute the number of rows in the selection before applying limit and offset
660        let rows_before = plan_builder
661            .num_rows_selected()
662            .unwrap_or(row_group.row_count);
663
664        if rows_before == 0 {
665            return Ok((self, None)); // ruled out entire row group
666        }
667
668        // Apply any limit and offset
669        let plan_builder = plan_builder
670            .limited(row_group.row_count)
671            .with_offset(self.offset)
672            .with_limit(self.limit)
673            .build_limited();
674
675        let rows_after = plan_builder
676            .num_rows_selected()
677            .unwrap_or(row_group.row_count);
678
679        // Update running offset and limit for after the current row group is read
680        if let Some(offset) = &mut self.offset {
681            // Reduction is either because of offset or limit, as limit is applied
682            // after offset has been "exhausted" can just use saturating sub here
683            *offset = offset.saturating_sub(rows_before - rows_after)
684        }
685
686        if rows_after == 0 {
687            return Ok((self, None)); // ruled out entire row group
688        }
689
690        if let Some(limit) = &mut self.limit {
691            *limit -= rows_after;
692        }
693        // fetch the pages needed for decoding
694        row_group
695            // Final projection fetch shouldn't expand selection for cache; pass None
696            .fetch(
697                &mut self.input,
698                &projection,
699                plan_builder.selection(),
700                batch_size,
701                None,
702            )
703            .await?;
704
705        let plan = plan_builder.build();
706
707        let cache_options = cache_options_builder.consumer();
708        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
709            .with_cache_options(Some(&cache_options))
710            .build_array_reader(self.fields.as_deref(), &projection)?;
711
712        let reader = ParquetRecordBatchReader::new(array_reader, plan);
713
714        Ok((self, Some(reader)))
715    }
716
717    /// Compute which columns are used in filters and the final (output) projection
718    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
719        let filters = self.filter.as_ref()?;
720        let mut cache_projection = filters.predicates.first()?.projection().clone();
721        for predicate in filters.predicates.iter() {
722            cache_projection.union(predicate.projection());
723        }
724        cache_projection.intersect(projection);
725        self.exclude_nested_columns_from_cache(&cache_projection)
726    }
727
728    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
729    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
730        let schema = self.metadata.file_metadata().schema_descr();
731        let num_leaves = schema.num_columns();
732
733        // Count how many leaves each root column has
734        let num_roots = schema.root_schema().get_fields().len();
735        let mut root_leaf_counts = vec![0usize; num_roots];
736        for leaf_idx in 0..num_leaves {
737            let root_idx = schema.get_column_root_idx(leaf_idx);
738            root_leaf_counts[root_idx] += 1;
739        }
740
741        // Keep only leaves whose root has exactly one leaf (non-nested)
742        let mut included_leaves = Vec::new();
743        for leaf_idx in 0..num_leaves {
744            if mask.leaf_included(leaf_idx) {
745                let root_idx = schema.get_column_root_idx(leaf_idx);
746                if root_leaf_counts[root_idx] == 1 {
747                    included_leaves.push(leaf_idx);
748                }
749            }
750        }
751
752        if included_leaves.is_empty() {
753            None
754        } else {
755            Some(ProjectionMask::leaves(schema, included_leaves))
756        }
757    }
758}
759
760enum StreamState<T> {
761    /// At the start of a new row group, or the end of the parquet stream
762    Init,
763    /// Decoding a batch
764    Decoding(ParquetRecordBatchReader),
765    /// Reading data from input
766    Reading(BoxFuture<'static, ReadResult<T>>),
767    /// Error
768    Error,
769}
770
771impl<T> std::fmt::Debug for StreamState<T> {
772    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
773        match self {
774            StreamState::Init => write!(f, "StreamState::Init"),
775            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
776            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
777            StreamState::Error => write!(f, "StreamState::Error"),
778        }
779    }
780}
781
782/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
783///
784/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
785/// allowing users to decode record batches separately from I/O.
786///
787/// # I/O Buffering
788///
789/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
790/// (projection + filtering, etc) and decodes the rows from those buffered pages.
791///
792/// For example, if all rows and columns are selected, the entire row group is
793/// buffered in memory during decode. This minimizes the number of IO operations
794/// required, which is especially important for object stores, where IO operations
795/// have latencies in the hundreds of milliseconds
796///
797///
798/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
799pub struct ParquetRecordBatchStream<T> {
800    metadata: Arc<ParquetMetaData>,
801
802    schema: SchemaRef,
803
804    row_groups: VecDeque<usize>,
805
806    projection: ProjectionMask,
807
808    batch_size: usize,
809
810    selection: Option<RowSelection>,
811
812    /// This is an option so it can be moved into a future
813    reader_factory: Option<ReaderFactory<T>>,
814
815    state: StreamState<T>,
816}
817
818impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
819    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
820        f.debug_struct("ParquetRecordBatchStream")
821            .field("metadata", &self.metadata)
822            .field("schema", &self.schema)
823            .field("batch_size", &self.batch_size)
824            .field("projection", &self.projection)
825            .field("state", &self.state)
826            .finish()
827    }
828}
829
830impl<T> ParquetRecordBatchStream<T> {
831    /// Returns the projected [`SchemaRef`] for reading the parquet file.
832    ///
833    /// Note that the schema metadata will be stripped here. See
834    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
835    pub fn schema(&self) -> &SchemaRef {
836        &self.schema
837    }
838}
839
840impl<T> ParquetRecordBatchStream<T>
841where
842    T: AsyncFileReader + Unpin + Send + 'static,
843{
844    /// Fetches the next row group from the stream.
845    ///
846    /// Users can continue to call this function to get row groups and decode them concurrently.
847    ///
848    /// ## Notes
849    ///
850    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
851    ///
852    /// ## Returns
853    ///
854    /// - `Ok(None)` if the stream has ended.
855    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
856    /// - `Ok(Some(reader))` which holds all the data for the row group.
857    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
858        loop {
859            match &mut self.state {
860                StreamState::Decoding(_) | StreamState::Reading(_) => {
861                    return Err(ParquetError::General(
862                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
863                    ))
864                }
865                StreamState::Init => {
866                    let row_group_idx = match self.row_groups.pop_front() {
867                        Some(idx) => idx,
868                        None => return Ok(None),
869                    };
870
871                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
872
873                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
874
875                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
876
877                    let (reader_factory, maybe_reader) = reader_factory
878                        .read_row_group(
879                            row_group_idx,
880                            selection,
881                            self.projection.clone(),
882                            self.batch_size,
883                        )
884                        .await
885                        .inspect_err(|_| {
886                            self.state = StreamState::Error;
887                        })?;
888                    self.reader_factory = Some(reader_factory);
889
890                    if let Some(reader) = maybe_reader {
891                        return Ok(Some(reader));
892                    } else {
893                        // All rows skipped, read next row group
894                        continue;
895                    }
896                }
897                StreamState::Error => return Ok(None), // Ends the stream as error happens.
898            }
899        }
900    }
901}
902
903impl<T> Stream for ParquetRecordBatchStream<T>
904where
905    T: AsyncFileReader + Unpin + Send + 'static,
906{
907    type Item = Result<RecordBatch>;
908
909    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
910        loop {
911            match &mut self.state {
912                StreamState::Decoding(batch_reader) => match batch_reader.next() {
913                    Some(Ok(batch)) => {
914                        return Poll::Ready(Some(Ok(batch)));
915                    }
916                    Some(Err(e)) => {
917                        self.state = StreamState::Error;
918                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
919                    }
920                    None => self.state = StreamState::Init,
921                },
922                StreamState::Init => {
923                    let row_group_idx = match self.row_groups.pop_front() {
924                        Some(idx) => idx,
925                        None => return Poll::Ready(None),
926                    };
927
928                    let reader = self.reader_factory.take().expect("lost reader factory");
929
930                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
931
932                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
933
934                    let fut = reader
935                        .read_row_group(
936                            row_group_idx,
937                            selection,
938                            self.projection.clone(),
939                            self.batch_size,
940                        )
941                        .boxed();
942
943                    self.state = StreamState::Reading(fut)
944                }
945                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
946                    Ok((reader_factory, maybe_reader)) => {
947                        self.reader_factory = Some(reader_factory);
948                        match maybe_reader {
949                            // Read records from [`ParquetRecordBatchReader`]
950                            Some(reader) => self.state = StreamState::Decoding(reader),
951                            // All rows skipped, read next row group
952                            None => self.state = StreamState::Init,
953                        }
954                    }
955                    Err(e) => {
956                        self.state = StreamState::Error;
957                        return Poll::Ready(Some(Err(e)));
958                    }
959                },
960                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
961            }
962        }
963    }
964}
965
966/// An in-memory collection of column chunks
967struct InMemoryRowGroup<'a> {
968    offset_index: Option<&'a [OffsetIndexMetaData]>,
969    /// Column chunks for this row group
970    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
971    row_count: usize,
972    row_group_idx: usize,
973    metadata: &'a ParquetMetaData,
974}
975
976impl InMemoryRowGroup<'_> {
977    /// Fetches any additional column data specified in `projection` that is not already
978    /// present in `self.column_chunks`.
979    ///
980    /// If `selection` is provided, only the pages required for the selection
981    /// are fetched. Otherwise, all pages are fetched.
982    async fn fetch<T: AsyncFileReader + Send>(
983        &mut self,
984        input: &mut T,
985        projection: &ProjectionMask,
986        selection: Option<&RowSelection>,
987        batch_size: usize,
988        cache_mask: Option<&ProjectionMask>,
989    ) -> Result<()> {
990        let metadata = self.metadata.row_group(self.row_group_idx);
991        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
992            let expanded_selection =
993                selection.expand_to_batch_boundaries(batch_size, self.row_count);
994            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
995            // `RowSelection`
996            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
997
998            let fetch_ranges = self
999                .column_chunks
1000                .iter()
1001                .zip(metadata.columns())
1002                .enumerate()
1003                .filter(|&(idx, (chunk, _chunk_meta))| {
1004                    chunk.is_none() && projection.leaf_included(idx)
1005                })
1006                .flat_map(|(idx, (_chunk, chunk_meta))| {
1007                    // If the first page does not start at the beginning of the column,
1008                    // then we need to also fetch a dictionary page.
1009                    let mut ranges: Vec<Range<u64>> = vec![];
1010                    let (start, _len) = chunk_meta.byte_range();
1011                    match offset_index[idx].page_locations.first() {
1012                        Some(first) if first.offset as u64 != start => {
1013                            ranges.push(start..first.offset as u64);
1014                        }
1015                        _ => (),
1016                    }
1017
1018                    // Expand selection to batch boundaries only for cached columns
1019                    let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1020                    if use_expanded {
1021                        ranges.extend(
1022                            expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1023                        );
1024                    } else {
1025                        ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1026                    }
1027                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1028
1029                    ranges
1030                })
1031                .collect();
1032
1033            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1034            let mut page_start_offsets = page_start_offsets.into_iter();
1035
1036            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1037                if chunk.is_some() || !projection.leaf_included(idx) {
1038                    continue;
1039                }
1040
1041                if let Some(offsets) = page_start_offsets.next() {
1042                    let mut chunks = Vec::with_capacity(offsets.len());
1043                    for _ in 0..offsets.len() {
1044                        chunks.push(chunk_data.next().unwrap());
1045                    }
1046
1047                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1048                        length: metadata.column(idx).byte_range().1 as usize,
1049                        data: offsets
1050                            .into_iter()
1051                            .map(|x| x as usize)
1052                            .zip(chunks.into_iter())
1053                            .collect(),
1054                    }))
1055                }
1056            }
1057        } else {
1058            let fetch_ranges = self
1059                .column_chunks
1060                .iter()
1061                .enumerate()
1062                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1063                .map(|(idx, _chunk)| {
1064                    let column = metadata.column(idx);
1065                    let (start, length) = column.byte_range();
1066                    start..(start + length)
1067                })
1068                .collect();
1069
1070            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1071
1072            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1073                if chunk.is_some() || !projection.leaf_included(idx) {
1074                    continue;
1075                }
1076
1077                if let Some(data) = chunk_data.next() {
1078                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
1079                        offset: metadata.column(idx).byte_range().0 as usize,
1080                        data,
1081                    }));
1082                }
1083            }
1084        }
1085
1086        Ok(())
1087    }
1088}
1089
1090impl RowGroups for InMemoryRowGroup<'_> {
1091    fn num_rows(&self) -> usize {
1092        self.row_count
1093    }
1094
1095    /// Return chunks for column i
1096    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1097        match &self.column_chunks[i] {
1098            None => Err(ParquetError::General(format!(
1099                "Invalid column index {i}, column was not fetched"
1100            ))),
1101            Some(data) => {
1102                let page_locations = self
1103                    .offset_index
1104                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1105                    .filter(|index| !index.is_empty())
1106                    .map(|index| index[i].page_locations.clone());
1107                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1108                let page_reader = SerializedPageReader::new(
1109                    data.clone(),
1110                    column_chunk_metadata,
1111                    self.row_count,
1112                    page_locations,
1113                )?;
1114                let page_reader = page_reader.add_crypto_context(
1115                    self.row_group_idx,
1116                    i,
1117                    self.metadata,
1118                    column_chunk_metadata,
1119                )?;
1120
1121                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1122
1123                Ok(Box::new(ColumnChunkIterator {
1124                    reader: Some(Ok(page_reader)),
1125                }))
1126            }
1127        }
1128    }
1129}
1130
1131/// An in-memory column chunk
1132#[derive(Clone)]
1133enum ColumnChunkData {
1134    /// Column chunk data representing only a subset of data pages
1135    Sparse {
1136        /// Length of the full column chunk
1137        length: usize,
1138        /// Subset of data pages included in this sparse chunk.
1139        ///
1140        /// Each element is a tuple of (page offset within file, page data).
1141        /// Each entry is a complete page and the list is ordered by offset.
1142        data: Vec<(usize, Bytes)>,
1143    },
1144    /// Full column chunk and the offset within the original file
1145    Dense { offset: usize, data: Bytes },
1146}
1147
1148impl ColumnChunkData {
1149    /// Return the data for this column chunk at the given offset
1150    fn get(&self, start: u64) -> Result<Bytes> {
1151        match &self {
1152            ColumnChunkData::Sparse { data, .. } => data
1153                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1154                .map(|idx| data[idx].1.clone())
1155                .map_err(|_| {
1156                    ParquetError::General(format!(
1157                        "Invalid offset in sparse column chunk data: {start}"
1158                    ))
1159                }),
1160            ColumnChunkData::Dense { offset, data } => {
1161                let start = start as usize - *offset;
1162                Ok(data.slice(start..))
1163            }
1164        }
1165    }
1166}
1167
1168impl Length for ColumnChunkData {
1169    /// Return the total length of the full column chunk
1170    fn len(&self) -> u64 {
1171        match &self {
1172            ColumnChunkData::Sparse { length, .. } => *length as u64,
1173            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1174        }
1175    }
1176}
1177
1178impl ChunkReader for ColumnChunkData {
1179    type T = bytes::buf::Reader<Bytes>;
1180
1181    fn get_read(&self, start: u64) -> Result<Self::T> {
1182        Ok(self.get(start)?.reader())
1183    }
1184
1185    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1186        Ok(self.get(start)?.slice(..length))
1187    }
1188}
1189
1190/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1191struct ColumnChunkIterator {
1192    reader: Option<Result<Box<dyn PageReader>>>,
1193}
1194
1195impl Iterator for ColumnChunkIterator {
1196    type Item = Result<Box<dyn PageReader>>;
1197
1198    fn next(&mut self) -> Option<Self::Item> {
1199        self.reader.take()
1200    }
1201}
1202
1203impl PageIterator for ColumnChunkIterator {}
1204
1205#[cfg(test)]
1206mod tests {
1207    use super::*;
1208    use crate::arrow::arrow_reader::{
1209        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1210    };
1211    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1212    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1213    use crate::arrow::ArrowWriter;
1214    use crate::file::metadata::ParquetMetaDataReader;
1215    use crate::file::properties::WriterProperties;
1216    use arrow::compute::kernels::cmp::eq;
1217    use arrow::error::Result as ArrowResult;
1218    use arrow_array::builder::{ListBuilder, StringBuilder};
1219    use arrow_array::cast::AsArray;
1220    use arrow_array::types::Int32Type;
1221    use arrow_array::{
1222        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1223        StructArray, UInt64Array,
1224    };
1225    use arrow_schema::{DataType, Field, Schema};
1226    use futures::{StreamExt, TryStreamExt};
1227    use rand::{rng, Rng};
1228    use std::collections::HashMap;
1229    use std::sync::{Arc, Mutex};
1230    use tempfile::tempfile;
1231
1232    #[derive(Clone)]
1233    struct TestReader {
1234        data: Bytes,
1235        metadata: Option<Arc<ParquetMetaData>>,
1236        requests: Arc<Mutex<Vec<Range<usize>>>>,
1237    }
1238
1239    impl TestReader {
1240        fn new(data: Bytes) -> Self {
1241            Self {
1242                data,
1243                metadata: Default::default(),
1244                requests: Default::default(),
1245            }
1246        }
1247    }
1248
1249    impl AsyncFileReader for TestReader {
1250        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1251            let range = range.clone();
1252            self.requests
1253                .lock()
1254                .unwrap()
1255                .push(range.start as usize..range.end as usize);
1256            futures::future::ready(Ok(self
1257                .data
1258                .slice(range.start as usize..range.end as usize)))
1259            .boxed()
1260        }
1261
1262        fn get_metadata<'a>(
1263            &'a mut self,
1264            options: Option<&'a ArrowReaderOptions>,
1265        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1266            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1267                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1268            );
1269            self.metadata = Some(Arc::new(
1270                metadata_reader.parse_and_finish(&self.data).unwrap(),
1271            ));
1272            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn test_async_reader() {
1278        let testdata = arrow::util::test_util::parquet_test_data();
1279        let path = format!("{testdata}/alltypes_plain.parquet");
1280        let data = Bytes::from(std::fs::read(path).unwrap());
1281
1282        let async_reader = TestReader::new(data.clone());
1283
1284        let requests = async_reader.requests.clone();
1285        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1286            .await
1287            .unwrap();
1288
1289        let metadata = builder.metadata().clone();
1290        assert_eq!(metadata.num_row_groups(), 1);
1291
1292        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1293        let stream = builder
1294            .with_projection(mask.clone())
1295            .with_batch_size(1024)
1296            .build()
1297            .unwrap();
1298
1299        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1300
1301        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1302            .unwrap()
1303            .with_projection(mask)
1304            .with_batch_size(104)
1305            .build()
1306            .unwrap()
1307            .collect::<ArrowResult<Vec<_>>>()
1308            .unwrap();
1309
1310        assert_eq!(async_batches, sync_batches);
1311
1312        let requests = requests.lock().unwrap();
1313        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1314        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1315
1316        assert_eq!(
1317            &requests[..],
1318            &[
1319                offset_1 as usize..(offset_1 + length_1) as usize,
1320                offset_2 as usize..(offset_2 + length_2) as usize
1321            ]
1322        );
1323    }
1324
1325    #[tokio::test]
1326    async fn test_async_reader_with_next_row_group() {
1327        let testdata = arrow::util::test_util::parquet_test_data();
1328        let path = format!("{testdata}/alltypes_plain.parquet");
1329        let data = Bytes::from(std::fs::read(path).unwrap());
1330
1331        let async_reader = TestReader::new(data.clone());
1332
1333        let requests = async_reader.requests.clone();
1334        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1335            .await
1336            .unwrap();
1337
1338        let metadata = builder.metadata().clone();
1339        assert_eq!(metadata.num_row_groups(), 1);
1340
1341        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1342        let mut stream = builder
1343            .with_projection(mask.clone())
1344            .with_batch_size(1024)
1345            .build()
1346            .unwrap();
1347
1348        let mut readers = vec![];
1349        while let Some(reader) = stream.next_row_group().await.unwrap() {
1350            readers.push(reader);
1351        }
1352
1353        let async_batches: Vec<_> = readers
1354            .into_iter()
1355            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1356            .collect();
1357
1358        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1359            .unwrap()
1360            .with_projection(mask)
1361            .with_batch_size(104)
1362            .build()
1363            .unwrap()
1364            .collect::<ArrowResult<Vec<_>>>()
1365            .unwrap();
1366
1367        assert_eq!(async_batches, sync_batches);
1368
1369        let requests = requests.lock().unwrap();
1370        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1371        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1372
1373        assert_eq!(
1374            &requests[..],
1375            &[
1376                offset_1 as usize..(offset_1 + length_1) as usize,
1377                offset_2 as usize..(offset_2 + length_2) as usize
1378            ]
1379        );
1380    }
1381
1382    #[tokio::test]
1383    async fn test_async_reader_with_index() {
1384        let testdata = arrow::util::test_util::parquet_test_data();
1385        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1386        let data = Bytes::from(std::fs::read(path).unwrap());
1387
1388        let async_reader = TestReader::new(data.clone());
1389
1390        let options = ArrowReaderOptions::new().with_page_index(true);
1391        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1392            .await
1393            .unwrap();
1394
1395        // The builder should have page and offset indexes loaded now
1396        let metadata_with_index = builder.metadata();
1397        assert_eq!(metadata_with_index.num_row_groups(), 1);
1398
1399        // Check offset indexes are present for all columns
1400        let offset_index = metadata_with_index.offset_index().unwrap();
1401        let column_index = metadata_with_index.column_index().unwrap();
1402
1403        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1404        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1405
1406        let num_columns = metadata_with_index
1407            .file_metadata()
1408            .schema_descr()
1409            .num_columns();
1410
1411        // Check page indexes are present for all columns
1412        offset_index
1413            .iter()
1414            .for_each(|x| assert_eq!(x.len(), num_columns));
1415        column_index
1416            .iter()
1417            .for_each(|x| assert_eq!(x.len(), num_columns));
1418
1419        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1420        let stream = builder
1421            .with_projection(mask.clone())
1422            .with_batch_size(1024)
1423            .build()
1424            .unwrap();
1425
1426        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1427
1428        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1429            .unwrap()
1430            .with_projection(mask)
1431            .with_batch_size(1024)
1432            .build()
1433            .unwrap()
1434            .collect::<ArrowResult<Vec<_>>>()
1435            .unwrap();
1436
1437        assert_eq!(async_batches, sync_batches);
1438    }
1439
1440    #[tokio::test]
1441    async fn test_async_reader_with_limit() {
1442        let testdata = arrow::util::test_util::parquet_test_data();
1443        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1444        let data = Bytes::from(std::fs::read(path).unwrap());
1445
1446        let metadata = ParquetMetaDataReader::new()
1447            .parse_and_finish(&data)
1448            .unwrap();
1449        let metadata = Arc::new(metadata);
1450
1451        assert_eq!(metadata.num_row_groups(), 1);
1452
1453        let async_reader = TestReader::new(data.clone());
1454
1455        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1456            .await
1457            .unwrap();
1458
1459        assert_eq!(builder.metadata().num_row_groups(), 1);
1460
1461        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1462        let stream = builder
1463            .with_projection(mask.clone())
1464            .with_batch_size(1024)
1465            .with_limit(1)
1466            .build()
1467            .unwrap();
1468
1469        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1470
1471        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1472            .unwrap()
1473            .with_projection(mask)
1474            .with_batch_size(1024)
1475            .with_limit(1)
1476            .build()
1477            .unwrap()
1478            .collect::<ArrowResult<Vec<_>>>()
1479            .unwrap();
1480
1481        assert_eq!(async_batches, sync_batches);
1482    }
1483
1484    #[tokio::test]
1485    async fn test_async_reader_skip_pages() {
1486        let testdata = arrow::util::test_util::parquet_test_data();
1487        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1488        let data = Bytes::from(std::fs::read(path).unwrap());
1489
1490        let async_reader = TestReader::new(data.clone());
1491
1492        let options = ArrowReaderOptions::new().with_page_index(true);
1493        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1494            .await
1495            .unwrap();
1496
1497        assert_eq!(builder.metadata().num_row_groups(), 1);
1498
1499        let selection = RowSelection::from(vec![
1500            RowSelector::skip(21),   // Skip first page
1501            RowSelector::select(21), // Select page to boundary
1502            RowSelector::skip(41),   // Skip multiple pages
1503            RowSelector::select(41), // Select multiple pages
1504            RowSelector::skip(25),   // Skip page across boundary
1505            RowSelector::select(25), // Select across page boundary
1506            RowSelector::skip(7116), // Skip to final page boundary
1507            RowSelector::select(10), // Select final page
1508        ]);
1509
1510        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1511
1512        let stream = builder
1513            .with_projection(mask.clone())
1514            .with_row_selection(selection.clone())
1515            .build()
1516            .expect("building stream");
1517
1518        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1519
1520        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1521            .unwrap()
1522            .with_projection(mask)
1523            .with_batch_size(1024)
1524            .with_row_selection(selection)
1525            .build()
1526            .unwrap()
1527            .collect::<ArrowResult<Vec<_>>>()
1528            .unwrap();
1529
1530        assert_eq!(async_batches, sync_batches);
1531    }
1532
1533    #[tokio::test]
1534    async fn test_fuzz_async_reader_selection() {
1535        let testdata = arrow::util::test_util::parquet_test_data();
1536        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1537        let data = Bytes::from(std::fs::read(path).unwrap());
1538
1539        let mut rand = rng();
1540
1541        for _ in 0..100 {
1542            let mut expected_rows = 0;
1543            let mut total_rows = 0;
1544            let mut skip = false;
1545            let mut selectors = vec![];
1546
1547            while total_rows < 7300 {
1548                let row_count: usize = rand.random_range(1..100);
1549
1550                let row_count = row_count.min(7300 - total_rows);
1551
1552                selectors.push(RowSelector { row_count, skip });
1553
1554                total_rows += row_count;
1555                if !skip {
1556                    expected_rows += row_count;
1557                }
1558
1559                skip = !skip;
1560            }
1561
1562            let selection = RowSelection::from(selectors);
1563
1564            let async_reader = TestReader::new(data.clone());
1565
1566            let options = ArrowReaderOptions::new().with_page_index(true);
1567            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1568                .await
1569                .unwrap();
1570
1571            assert_eq!(builder.metadata().num_row_groups(), 1);
1572
1573            let col_idx: usize = rand.random_range(0..13);
1574            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1575
1576            let stream = builder
1577                .with_projection(mask.clone())
1578                .with_row_selection(selection.clone())
1579                .build()
1580                .expect("building stream");
1581
1582            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1583
1584            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1585
1586            assert_eq!(actual_rows, expected_rows);
1587        }
1588    }
1589
1590    #[tokio::test]
1591    async fn test_async_reader_zero_row_selector() {
1592        //See https://github.com/apache/arrow-rs/issues/2669
1593        let testdata = arrow::util::test_util::parquet_test_data();
1594        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1595        let data = Bytes::from(std::fs::read(path).unwrap());
1596
1597        let mut rand = rng();
1598
1599        let mut expected_rows = 0;
1600        let mut total_rows = 0;
1601        let mut skip = false;
1602        let mut selectors = vec![];
1603
1604        selectors.push(RowSelector {
1605            row_count: 0,
1606            skip: false,
1607        });
1608
1609        while total_rows < 7300 {
1610            let row_count: usize = rand.random_range(1..100);
1611
1612            let row_count = row_count.min(7300 - total_rows);
1613
1614            selectors.push(RowSelector { row_count, skip });
1615
1616            total_rows += row_count;
1617            if !skip {
1618                expected_rows += row_count;
1619            }
1620
1621            skip = !skip;
1622        }
1623
1624        let selection = RowSelection::from(selectors);
1625
1626        let async_reader = TestReader::new(data.clone());
1627
1628        let options = ArrowReaderOptions::new().with_page_index(true);
1629        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1630            .await
1631            .unwrap();
1632
1633        assert_eq!(builder.metadata().num_row_groups(), 1);
1634
1635        let col_idx: usize = rand.random_range(0..13);
1636        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1637
1638        let stream = builder
1639            .with_projection(mask.clone())
1640            .with_row_selection(selection.clone())
1641            .build()
1642            .expect("building stream");
1643
1644        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1645
1646        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1647
1648        assert_eq!(actual_rows, expected_rows);
1649    }
1650
1651    #[tokio::test]
1652    async fn test_row_filter() {
1653        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1654        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1655        let data = RecordBatch::try_from_iter([
1656            ("a", Arc::new(a) as ArrayRef),
1657            ("b", Arc::new(b) as ArrayRef),
1658        ])
1659        .unwrap();
1660
1661        let mut buf = Vec::with_capacity(1024);
1662        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1663        writer.write(&data).unwrap();
1664        writer.close().unwrap();
1665
1666        let data: Bytes = buf.into();
1667        let metadata = ParquetMetaDataReader::new()
1668            .parse_and_finish(&data)
1669            .unwrap();
1670        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1671
1672        let test = TestReader::new(data);
1673        let requests = test.requests.clone();
1674
1675        let a_scalar = StringArray::from_iter_values(["b"]);
1676        let a_filter = ArrowPredicateFn::new(
1677            ProjectionMask::leaves(&parquet_schema, vec![0]),
1678            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1679        );
1680
1681        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1682
1683        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1684        let stream = ParquetRecordBatchStreamBuilder::new(test)
1685            .await
1686            .unwrap()
1687            .with_projection(mask.clone())
1688            .with_batch_size(1024)
1689            .with_row_filter(filter)
1690            .build()
1691            .unwrap();
1692
1693        let batches: Vec<_> = stream.try_collect().await.unwrap();
1694        assert_eq!(batches.len(), 1);
1695
1696        let batch = &batches[0];
1697        assert_eq!(batch.num_columns(), 2);
1698
1699        // Filter should have kept only rows with "b" in column 0
1700        assert_eq!(
1701            batch.column(0).as_ref(),
1702            &StringArray::from_iter_values(["b", "b", "b"])
1703        );
1704        assert_eq!(
1705            batch.column(1).as_ref(),
1706            &StringArray::from_iter_values(["2", "3", "4"])
1707        );
1708
1709        // Should only have made 2 requests:
1710        // * First request fetches data for evaluating the predicate
1711        // * Second request fetches data for evaluating the projection
1712        assert_eq!(requests.lock().unwrap().len(), 2);
1713    }
1714
1715    #[tokio::test]
1716    async fn test_two_row_filters() {
1717        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1718        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1719        let c = Int32Array::from_iter(0..6);
1720        let data = RecordBatch::try_from_iter([
1721            ("a", Arc::new(a) as ArrayRef),
1722            ("b", Arc::new(b) as ArrayRef),
1723            ("c", Arc::new(c) as ArrayRef),
1724        ])
1725        .unwrap();
1726
1727        let mut buf = Vec::with_capacity(1024);
1728        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1729        writer.write(&data).unwrap();
1730        writer.close().unwrap();
1731
1732        let data: Bytes = buf.into();
1733        let metadata = ParquetMetaDataReader::new()
1734            .parse_and_finish(&data)
1735            .unwrap();
1736        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1737
1738        let test = TestReader::new(data);
1739        let requests = test.requests.clone();
1740
1741        let a_scalar = StringArray::from_iter_values(["b"]);
1742        let a_filter = ArrowPredicateFn::new(
1743            ProjectionMask::leaves(&parquet_schema, vec![0]),
1744            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1745        );
1746
1747        let b_scalar = StringArray::from_iter_values(["4"]);
1748        let b_filter = ArrowPredicateFn::new(
1749            ProjectionMask::leaves(&parquet_schema, vec![1]),
1750            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1751        );
1752
1753        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1754
1755        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1756        let stream = ParquetRecordBatchStreamBuilder::new(test)
1757            .await
1758            .unwrap()
1759            .with_projection(mask.clone())
1760            .with_batch_size(1024)
1761            .with_row_filter(filter)
1762            .build()
1763            .unwrap();
1764
1765        let batches: Vec<_> = stream.try_collect().await.unwrap();
1766        assert_eq!(batches.len(), 1);
1767
1768        let batch = &batches[0];
1769        assert_eq!(batch.num_rows(), 1);
1770        assert_eq!(batch.num_columns(), 2);
1771
1772        let col = batch.column(0);
1773        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1774        assert_eq!(val, "b");
1775
1776        let col = batch.column(1);
1777        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1778        assert_eq!(val, 3);
1779
1780        // Should only have made 3 requests
1781        // * First request fetches data for evaluating the first predicate
1782        // * Second request fetches data for evaluating the second predicate
1783        // * Third request fetches data for evaluating the projection
1784        assert_eq!(requests.lock().unwrap().len(), 3);
1785    }
1786
1787    #[tokio::test]
1788    async fn test_limit_multiple_row_groups() {
1789        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1790        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1791        let c = Int32Array::from_iter(0..6);
1792        let data = RecordBatch::try_from_iter([
1793            ("a", Arc::new(a) as ArrayRef),
1794            ("b", Arc::new(b) as ArrayRef),
1795            ("c", Arc::new(c) as ArrayRef),
1796        ])
1797        .unwrap();
1798
1799        let mut buf = Vec::with_capacity(1024);
1800        let props = WriterProperties::builder()
1801            .set_max_row_group_size(3)
1802            .build();
1803        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1804        writer.write(&data).unwrap();
1805        writer.close().unwrap();
1806
1807        let data: Bytes = buf.into();
1808        let metadata = ParquetMetaDataReader::new()
1809            .parse_and_finish(&data)
1810            .unwrap();
1811
1812        assert_eq!(metadata.num_row_groups(), 2);
1813
1814        let test = TestReader::new(data);
1815
1816        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1817            .await
1818            .unwrap()
1819            .with_batch_size(1024)
1820            .with_limit(4)
1821            .build()
1822            .unwrap();
1823
1824        let batches: Vec<_> = stream.try_collect().await.unwrap();
1825        // Expect one batch for each row group
1826        assert_eq!(batches.len(), 2);
1827
1828        let batch = &batches[0];
1829        // First batch should contain all rows
1830        assert_eq!(batch.num_rows(), 3);
1831        assert_eq!(batch.num_columns(), 3);
1832        let col2 = batch.column(2).as_primitive::<Int32Type>();
1833        assert_eq!(col2.values(), &[0, 1, 2]);
1834
1835        let batch = &batches[1];
1836        // Second batch should trigger the limit and only have one row
1837        assert_eq!(batch.num_rows(), 1);
1838        assert_eq!(batch.num_columns(), 3);
1839        let col2 = batch.column(2).as_primitive::<Int32Type>();
1840        assert_eq!(col2.values(), &[3]);
1841
1842        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1843            .await
1844            .unwrap()
1845            .with_offset(2)
1846            .with_limit(3)
1847            .build()
1848            .unwrap();
1849
1850        let batches: Vec<_> = stream.try_collect().await.unwrap();
1851        // Expect one batch for each row group
1852        assert_eq!(batches.len(), 2);
1853
1854        let batch = &batches[0];
1855        // First batch should contain one row
1856        assert_eq!(batch.num_rows(), 1);
1857        assert_eq!(batch.num_columns(), 3);
1858        let col2 = batch.column(2).as_primitive::<Int32Type>();
1859        assert_eq!(col2.values(), &[2]);
1860
1861        let batch = &batches[1];
1862        // Second batch should contain two rows
1863        assert_eq!(batch.num_rows(), 2);
1864        assert_eq!(batch.num_columns(), 3);
1865        let col2 = batch.column(2).as_primitive::<Int32Type>();
1866        assert_eq!(col2.values(), &[3, 4]);
1867
1868        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1869            .await
1870            .unwrap()
1871            .with_offset(4)
1872            .with_limit(20)
1873            .build()
1874            .unwrap();
1875
1876        let batches: Vec<_> = stream.try_collect().await.unwrap();
1877        // Should skip first row group
1878        assert_eq!(batches.len(), 1);
1879
1880        let batch = &batches[0];
1881        // First batch should contain two rows
1882        assert_eq!(batch.num_rows(), 2);
1883        assert_eq!(batch.num_columns(), 3);
1884        let col2 = batch.column(2).as_primitive::<Int32Type>();
1885        assert_eq!(col2.values(), &[4, 5]);
1886    }
1887
1888    #[tokio::test]
1889    async fn test_row_filter_with_index() {
1890        let testdata = arrow::util::test_util::parquet_test_data();
1891        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1892        let data = Bytes::from(std::fs::read(path).unwrap());
1893
1894        let metadata = ParquetMetaDataReader::new()
1895            .parse_and_finish(&data)
1896            .unwrap();
1897        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1898
1899        assert_eq!(metadata.num_row_groups(), 1);
1900
1901        let async_reader = TestReader::new(data.clone());
1902
1903        let a_filter =
1904            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1905                Ok(batch.column(0).as_boolean().clone())
1906            });
1907
1908        let b_scalar = Int8Array::from(vec![2]);
1909        let b_filter = ArrowPredicateFn::new(
1910            ProjectionMask::leaves(&parquet_schema, vec![2]),
1911            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1912        );
1913
1914        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1915
1916        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1917
1918        let options = ArrowReaderOptions::new().with_page_index(true);
1919        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1920            .await
1921            .unwrap()
1922            .with_projection(mask.clone())
1923            .with_batch_size(1024)
1924            .with_row_filter(filter)
1925            .build()
1926            .unwrap();
1927
1928        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1929
1930        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1931
1932        assert_eq!(total_rows, 730);
1933    }
1934
1935    #[tokio::test]
1936    #[allow(deprecated)]
1937    async fn test_in_memory_row_group_sparse() {
1938        let testdata = arrow::util::test_util::parquet_test_data();
1939        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1940        let data = Bytes::from(std::fs::read(path).unwrap());
1941
1942        let metadata = ParquetMetaDataReader::new()
1943            .with_page_indexes(true)
1944            .parse_and_finish(&data)
1945            .unwrap();
1946
1947        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1948
1949        let mut metadata_builder = metadata.into_builder();
1950        let mut row_groups = metadata_builder.take_row_groups();
1951        row_groups.truncate(1);
1952        let row_group_meta = row_groups.pop().unwrap();
1953
1954        let metadata = metadata_builder
1955            .add_row_group(row_group_meta)
1956            .set_column_index(None)
1957            .set_offset_index(Some(vec![offset_index.clone()]))
1958            .build();
1959
1960        let metadata = Arc::new(metadata);
1961
1962        let num_rows = metadata.row_group(0).num_rows();
1963
1964        assert_eq!(metadata.num_row_groups(), 1);
1965
1966        let async_reader = TestReader::new(data.clone());
1967
1968        let requests = async_reader.requests.clone();
1969        let (_, fields) = parquet_to_arrow_schema_and_fields(
1970            metadata.file_metadata().schema_descr(),
1971            ProjectionMask::all(),
1972            None,
1973        )
1974        .unwrap();
1975
1976        let _schema_desc = metadata.file_metadata().schema_descr();
1977
1978        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1979
1980        let reader_factory = ReaderFactory {
1981            metadata,
1982            fields: fields.map(Arc::new),
1983            input: async_reader,
1984            filter: None,
1985            limit: None,
1986            offset: None,
1987            metrics: ArrowReaderMetrics::disabled(),
1988            max_predicate_cache_size: 0,
1989        };
1990
1991        let mut skip = true;
1992        let mut pages = offset_index[0].page_locations.iter().peekable();
1993
1994        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1995        let mut selectors = vec![];
1996        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1997        while let Some(page) = pages.next() {
1998            let num_rows = if let Some(next_page) = pages.peek() {
1999                next_page.first_row_index - page.first_row_index
2000            } else {
2001                num_rows - page.first_row_index
2002            };
2003
2004            if skip {
2005                selectors.push(RowSelector::skip(num_rows as usize));
2006            } else {
2007                selectors.push(RowSelector::select(num_rows as usize));
2008                let start = page.offset as usize;
2009                let end = start + page.compressed_page_size as usize;
2010                expected_page_requests.push(start..end);
2011            }
2012            skip = !skip;
2013        }
2014
2015        let selection = RowSelection::from(selectors);
2016
2017        let (_factory, _reader) = reader_factory
2018            .read_row_group(0, Some(selection), projection.clone(), 48)
2019            .await
2020            .expect("reading row group");
2021
2022        let requests = requests.lock().unwrap();
2023
2024        assert_eq!(&requests[..], &expected_page_requests)
2025    }
2026
2027    #[tokio::test]
2028    async fn test_batch_size_overallocate() {
2029        let testdata = arrow::util::test_util::parquet_test_data();
2030        // `alltypes_plain.parquet` only have 8 rows
2031        let path = format!("{testdata}/alltypes_plain.parquet");
2032        let data = Bytes::from(std::fs::read(path).unwrap());
2033
2034        let async_reader = TestReader::new(data.clone());
2035
2036        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2037            .await
2038            .unwrap();
2039
2040        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2041
2042        let stream = builder
2043            .with_projection(ProjectionMask::all())
2044            .with_batch_size(1024)
2045            .build()
2046            .unwrap();
2047        assert_ne!(1024, file_rows);
2048        assert_eq!(stream.batch_size, file_rows);
2049    }
2050
2051    #[tokio::test]
2052    async fn test_get_row_group_column_bloom_filter_without_length() {
2053        let testdata = arrow::util::test_util::parquet_test_data();
2054        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2055        let data = Bytes::from(std::fs::read(path).unwrap());
2056        test_get_row_group_column_bloom_filter(data, false).await;
2057    }
2058
2059    #[tokio::test]
2060    async fn test_parquet_record_batch_stream_schema() {
2061        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2062            schema.flattened_fields().iter().map(|f| f.name()).collect()
2063        }
2064
2065        // ParquetRecordBatchReaderBuilder::schema differs from
2066        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
2067        // schema contents (in terms of custom metadata attached to schema, and fields
2068        // returned). Test to ensure this remains consistent behaviour.
2069        //
2070        // Ensure same for asynchronous versions of the above.
2071
2072        // Prep data, for a schema with nested fields, with custom metadata
2073        let mut metadata = HashMap::with_capacity(1);
2074        metadata.insert("key".to_string(), "value".to_string());
2075
2076        let nested_struct_array = StructArray::from(vec![
2077            (
2078                Arc::new(Field::new("d", DataType::Utf8, true)),
2079                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2080            ),
2081            (
2082                Arc::new(Field::new("e", DataType::Utf8, true)),
2083                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2084            ),
2085        ]);
2086        let struct_array = StructArray::from(vec![
2087            (
2088                Arc::new(Field::new("a", DataType::Int32, true)),
2089                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2090            ),
2091            (
2092                Arc::new(Field::new("b", DataType::UInt64, true)),
2093                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2094            ),
2095            (
2096                Arc::new(Field::new(
2097                    "c",
2098                    nested_struct_array.data_type().clone(),
2099                    true,
2100                )),
2101                Arc::new(nested_struct_array) as ArrayRef,
2102            ),
2103        ]);
2104
2105        let schema =
2106            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2107        let record_batch = RecordBatch::from(struct_array)
2108            .with_schema(schema.clone())
2109            .unwrap();
2110
2111        // Write parquet with custom metadata in schema
2112        let mut file = tempfile().unwrap();
2113        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2114        writer.write(&record_batch).unwrap();
2115        writer.close().unwrap();
2116
2117        let all_fields = ["a", "b", "c", "d", "e"];
2118        // (leaf indices in mask, expected names in output schema all fields)
2119        let projections = [
2120            (vec![], vec![]),
2121            (vec![0], vec!["a"]),
2122            (vec![0, 1], vec!["a", "b"]),
2123            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2124            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2125        ];
2126
2127        // Ensure we're consistent for each of these projections
2128        for (indices, expected_projected_names) in projections {
2129            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2130                // Builder schema should preserve all fields and metadata
2131                assert_eq!(get_all_field_names(&builder), all_fields);
2132                assert_eq!(builder.metadata, metadata);
2133                // Reader & batch schema should show only projected fields, and no metadata
2134                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2135                assert_eq!(reader.metadata, HashMap::default());
2136                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2137                assert_eq!(batch.metadata, HashMap::default());
2138            };
2139
2140            let builder =
2141                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2142            let sync_builder_schema = builder.schema().clone();
2143            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2144            let mut reader = builder.with_projection(mask).build().unwrap();
2145            let sync_reader_schema = reader.schema();
2146            let batch = reader.next().unwrap().unwrap();
2147            let sync_batch_schema = batch.schema();
2148            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2149
2150            // asynchronous should be same
2151            let file = tokio::fs::File::from(file.try_clone().unwrap());
2152            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2153            let async_builder_schema = builder.schema().clone();
2154            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2155            let mut reader = builder.with_projection(mask).build().unwrap();
2156            let async_reader_schema = reader.schema().clone();
2157            let batch = reader.next().await.unwrap().unwrap();
2158            let async_batch_schema = batch.schema();
2159            assert_schemas(
2160                async_builder_schema,
2161                async_reader_schema,
2162                async_batch_schema,
2163            );
2164        }
2165    }
2166
2167    #[tokio::test]
2168    async fn test_get_row_group_column_bloom_filter_with_length() {
2169        // convert to new parquet file with bloom_filter_length
2170        let testdata = arrow::util::test_util::parquet_test_data();
2171        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2172        let data = Bytes::from(std::fs::read(path).unwrap());
2173        let async_reader = TestReader::new(data.clone());
2174        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2175            .await
2176            .unwrap();
2177        let schema = builder.schema().clone();
2178        let stream = builder.build().unwrap();
2179        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2180
2181        let mut parquet_data = Vec::new();
2182        let props = WriterProperties::builder()
2183            .set_bloom_filter_enabled(true)
2184            .build();
2185        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2186        for batch in batches {
2187            writer.write(&batch).unwrap();
2188        }
2189        writer.close().unwrap();
2190
2191        // test the new parquet file
2192        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2193    }
2194
2195    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2196        let async_reader = TestReader::new(data.clone());
2197
2198        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2199            .await
2200            .unwrap();
2201
2202        let metadata = builder.metadata();
2203        assert_eq!(metadata.num_row_groups(), 1);
2204        let row_group = metadata.row_group(0);
2205        let column = row_group.column(0);
2206        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2207
2208        let sbbf = builder
2209            .get_row_group_column_bloom_filter(0, 0)
2210            .await
2211            .unwrap()
2212            .unwrap();
2213        assert!(sbbf.check(&"Hello"));
2214        assert!(!sbbf.check(&"Hello_Not_Exists"));
2215    }
2216
2217    #[tokio::test]
2218    async fn test_nested_skip() {
2219        let schema = Arc::new(Schema::new(vec![
2220            Field::new("col_1", DataType::UInt64, false),
2221            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2222        ]));
2223
2224        // Default writer properties
2225        let props = WriterProperties::builder()
2226            .set_data_page_row_count_limit(256)
2227            .set_write_batch_size(256)
2228            .set_max_row_group_size(1024);
2229
2230        // Write data
2231        let mut file = tempfile().unwrap();
2232        let mut writer =
2233            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2234
2235        let mut builder = ListBuilder::new(StringBuilder::new());
2236        for id in 0..1024 {
2237            match id % 3 {
2238                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2239                1 => builder.append_value([Some(format!("id_{id}"))]),
2240                _ => builder.append_null(),
2241            }
2242        }
2243        let refs = vec![
2244            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2245            Arc::new(builder.finish()) as ArrayRef,
2246        ];
2247
2248        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2249        writer.write(&batch).unwrap();
2250        writer.close().unwrap();
2251
2252        let selections = [
2253            RowSelection::from(vec![
2254                RowSelector::skip(313),
2255                RowSelector::select(1),
2256                RowSelector::skip(709),
2257                RowSelector::select(1),
2258            ]),
2259            RowSelection::from(vec![
2260                RowSelector::skip(255),
2261                RowSelector::select(1),
2262                RowSelector::skip(767),
2263                RowSelector::select(1),
2264            ]),
2265            RowSelection::from(vec![
2266                RowSelector::select(255),
2267                RowSelector::skip(1),
2268                RowSelector::select(767),
2269                RowSelector::skip(1),
2270            ]),
2271            RowSelection::from(vec![
2272                RowSelector::skip(254),
2273                RowSelector::select(1),
2274                RowSelector::select(1),
2275                RowSelector::skip(767),
2276                RowSelector::select(1),
2277            ]),
2278        ];
2279
2280        for selection in selections {
2281            let expected = selection.row_count();
2282            // Read data
2283            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2284                tokio::fs::File::from_std(file.try_clone().unwrap()),
2285                ArrowReaderOptions::new().with_page_index(true),
2286            )
2287            .await
2288            .unwrap();
2289
2290            reader = reader.with_row_selection(selection);
2291
2292            let mut stream = reader.build().unwrap();
2293
2294            let mut total_rows = 0;
2295            while let Some(rb) = stream.next().await {
2296                let rb = rb.unwrap();
2297                total_rows += rb.num_rows();
2298            }
2299            assert_eq!(total_rows, expected);
2300        }
2301    }
2302
2303    #[tokio::test]
2304    async fn test_row_filter_nested() {
2305        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2306        let b = StructArray::from(vec![
2307            (
2308                Arc::new(Field::new("aa", DataType::Utf8, true)),
2309                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2310            ),
2311            (
2312                Arc::new(Field::new("bb", DataType::Utf8, true)),
2313                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2314            ),
2315        ]);
2316        let c = Int32Array::from_iter(0..6);
2317        let data = RecordBatch::try_from_iter([
2318            ("a", Arc::new(a) as ArrayRef),
2319            ("b", Arc::new(b) as ArrayRef),
2320            ("c", Arc::new(c) as ArrayRef),
2321        ])
2322        .unwrap();
2323
2324        let mut buf = Vec::with_capacity(1024);
2325        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2326        writer.write(&data).unwrap();
2327        writer.close().unwrap();
2328
2329        let data: Bytes = buf.into();
2330        let metadata = ParquetMetaDataReader::new()
2331            .parse_and_finish(&data)
2332            .unwrap();
2333        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2334
2335        let test = TestReader::new(data);
2336        let requests = test.requests.clone();
2337
2338        let a_scalar = StringArray::from_iter_values(["b"]);
2339        let a_filter = ArrowPredicateFn::new(
2340            ProjectionMask::leaves(&parquet_schema, vec![0]),
2341            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2342        );
2343
2344        let b_scalar = StringArray::from_iter_values(["4"]);
2345        let b_filter = ArrowPredicateFn::new(
2346            ProjectionMask::leaves(&parquet_schema, vec![2]),
2347            move |batch| {
2348                // Filter on the second element of the struct.
2349                let struct_array = batch
2350                    .column(0)
2351                    .as_any()
2352                    .downcast_ref::<StructArray>()
2353                    .unwrap();
2354                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2355            },
2356        );
2357
2358        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2359
2360        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2361        let stream = ParquetRecordBatchStreamBuilder::new(test)
2362            .await
2363            .unwrap()
2364            .with_projection(mask.clone())
2365            .with_batch_size(1024)
2366            .with_row_filter(filter)
2367            .build()
2368            .unwrap();
2369
2370        let batches: Vec<_> = stream.try_collect().await.unwrap();
2371        assert_eq!(batches.len(), 1);
2372
2373        let batch = &batches[0];
2374        assert_eq!(batch.num_rows(), 1);
2375        assert_eq!(batch.num_columns(), 2);
2376
2377        let col = batch.column(0);
2378        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2379        assert_eq!(val, "b");
2380
2381        let col = batch.column(1);
2382        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2383        assert_eq!(val, 3);
2384
2385        // Should only have made 3 requests
2386        // * First request fetches data for evaluating the first predicate
2387        // * Second request fetches data for evaluating the second predicate
2388        // * Third request fetches data for evaluating the projection
2389        assert_eq!(requests.lock().unwrap().len(), 3);
2390    }
2391
2392    #[tokio::test]
2393    async fn test_cache_projection_excludes_nested_columns() {
2394        use arrow_array::{ArrayRef, StringArray};
2395
2396        // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
2397        let a = StringArray::from_iter_values(["r1", "r2"]);
2398        let b = StructArray::from(vec![
2399            (
2400                Arc::new(Field::new("aa", DataType::Utf8, true)),
2401                Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2402            ),
2403            (
2404                Arc::new(Field::new("bb", DataType::Utf8, true)),
2405                Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2406            ),
2407        ]);
2408
2409        let schema = Arc::new(Schema::new(vec![
2410            Field::new("a", DataType::Utf8, true),
2411            Field::new("b", b.data_type().clone(), true),
2412        ]));
2413
2414        let mut buf = Vec::new();
2415        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2416        let batch = RecordBatch::try_from_iter([
2417            ("a", Arc::new(a) as ArrayRef),
2418            ("b", Arc::new(b) as ArrayRef),
2419        ])
2420        .unwrap();
2421        writer.write(&batch).unwrap();
2422        writer.close().unwrap();
2423
2424        // Load Parquet metadata
2425        let data: Bytes = buf.into();
2426        let metadata = ParquetMetaDataReader::new()
2427            .parse_and_finish(&data)
2428            .unwrap();
2429        let metadata = Arc::new(metadata);
2430
2431        // Build a RowFilter whose predicate projects a leaf under the nested root `b`
2432        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
2433        let parquet_schema = metadata.file_metadata().schema_descr();
2434        let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2435
2436        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2437            Ok(arrow_array::BooleanArray::from(vec![
2438                true;
2439                batch.num_rows()
2440            ]))
2441        });
2442        let filter = RowFilter::new(vec![Box::new(always_true)]);
2443
2444        // Construct a ReaderFactory and compute cache projection
2445        let reader_factory = ReaderFactory {
2446            metadata: Arc::clone(&metadata),
2447            fields: None,
2448            input: TestReader::new(data),
2449            filter: Some(filter),
2450            limit: None,
2451            offset: None,
2452            metrics: ArrowReaderMetrics::disabled(),
2453            max_predicate_cache_size: 0,
2454        };
2455
2456        // Provide an output projection that also selects the same nested leaf
2457        let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2458
2459        // Expect None since nested columns should be excluded from cache projection
2460        assert!(cache_projection.is_none());
2461    }
2462
2463    #[tokio::test]
2464    #[allow(deprecated)]
2465    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2466        use tokio::fs::File;
2467        let testdata = arrow::util::test_util::parquet_test_data();
2468        let path = format!("{testdata}/alltypes_plain.parquet");
2469        let mut file = File::open(&path).await.unwrap();
2470        let file_size = file.metadata().await.unwrap().len();
2471        let mut metadata = ParquetMetaDataReader::new()
2472            .with_page_indexes(true)
2473            .load_and_finish(&mut file, file_size)
2474            .await
2475            .unwrap();
2476
2477        metadata.set_offset_index(Some(vec![]));
2478        let options = ArrowReaderOptions::new().with_page_index(true);
2479        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2480        let reader =
2481            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2482                .build()
2483                .unwrap();
2484
2485        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2486        assert_eq!(result.len(), 1);
2487    }
2488
2489    #[tokio::test]
2490    #[allow(deprecated)]
2491    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2492        use tokio::fs::File;
2493        let testdata = arrow::util::test_util::parquet_test_data();
2494        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2495        let mut file = File::open(&path).await.unwrap();
2496        let file_size = file.metadata().await.unwrap().len();
2497        let metadata = ParquetMetaDataReader::new()
2498            .with_page_indexes(true)
2499            .load_and_finish(&mut file, file_size)
2500            .await
2501            .unwrap();
2502
2503        let options = ArrowReaderOptions::new().with_page_index(true);
2504        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2505        let reader =
2506            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2507                .build()
2508                .unwrap();
2509
2510        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2511        assert_eq!(result.len(), 8);
2512    }
2513
2514    #[tokio::test]
2515    #[allow(deprecated)]
2516    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2517        use tempfile::TempDir;
2518        use tokio::fs::File;
2519        fn write_metadata_to_local_file(
2520            metadata: ParquetMetaData,
2521            file: impl AsRef<std::path::Path>,
2522        ) {
2523            use crate::file::metadata::ParquetMetaDataWriter;
2524            use std::fs::File;
2525            let file = File::create(file).unwrap();
2526            ParquetMetaDataWriter::new(file, &metadata)
2527                .finish()
2528                .unwrap()
2529        }
2530
2531        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2532            use std::fs::File;
2533            let file = File::open(file).unwrap();
2534            ParquetMetaDataReader::new()
2535                .with_page_indexes(true)
2536                .parse_and_finish(&file)
2537                .unwrap()
2538        }
2539
2540        let testdata = arrow::util::test_util::parquet_test_data();
2541        let path = format!("{testdata}/alltypes_plain.parquet");
2542        let mut file = File::open(&path).await.unwrap();
2543        let file_size = file.metadata().await.unwrap().len();
2544        let metadata = ParquetMetaDataReader::new()
2545            .with_page_indexes(true)
2546            .load_and_finish(&mut file, file_size)
2547            .await
2548            .unwrap();
2549
2550        let tempdir = TempDir::new().unwrap();
2551        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2552        write_metadata_to_local_file(metadata, &metadata_path);
2553        let metadata = read_metadata_from_local_file(&metadata_path);
2554
2555        let options = ArrowReaderOptions::new().with_page_index(true);
2556        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2557        let reader =
2558            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2559                .build()
2560                .unwrap();
2561
2562        // Panics here
2563        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2564        assert_eq!(result.len(), 1);
2565    }
2566
2567    #[tokio::test]
2568    async fn test_cached_array_reader_sparse_offset_error() {
2569        use futures::TryStreamExt;
2570
2571        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2572        use arrow_array::{BooleanArray, RecordBatch};
2573
2574        let testdata = arrow::util::test_util::parquet_test_data();
2575        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2576        let data = Bytes::from(std::fs::read(path).unwrap());
2577
2578        let async_reader = TestReader::new(data);
2579
2580        // Enable page index so the fetch logic loads only required pages
2581        let options = ArrowReaderOptions::new().with_page_index(true);
2582        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2583            .await
2584            .unwrap();
2585
2586        // Skip the first 22 rows (entire first Parquet page) and then select the
2587        // next 3 rows (22, 23, 24). This means the fetch step will not include
2588        // the first page starting at file offset 0.
2589        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2590
2591        // Trivial predicate on column 0 that always returns `true`. Using the
2592        // same column in both predicate and projection activates the caching
2593        // layer (Producer/Consumer pattern).
2594        let parquet_schema = builder.parquet_schema();
2595        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2596        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2597            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2598        });
2599        let filter = RowFilter::new(vec![Box::new(always_true)]);
2600
2601        // Build the stream with batch size 8 so the cache reads whole batches
2602        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2603        let stream = builder
2604            .with_batch_size(8)
2605            .with_projection(proj)
2606            .with_row_selection(selection)
2607            .with_row_filter(filter)
2608            .build()
2609            .unwrap();
2610
2611        // Collecting the stream should fail with the sparse column chunk offset
2612        // error we want to reproduce.
2613        let _result: Vec<_> = stream.try_collect().await.unwrap();
2614    }
2615}