parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42    ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46    RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
73///
74/// Notes:
75///
76/// 1. There is a default implementation for types that implement [`AsyncRead`]
77///    and [`AsyncSeek`], for example [`tokio::fs::File`].
78///
79/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
80///    is enabled, implements this interface for [`ObjectStore`].
81///
82/// [`ObjectStore`]: object_store::ObjectStore
83///
84/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
85pub trait AsyncFileReader: Send {
86    /// Retrieve the bytes in `range`
87    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
90    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91        async move {
92            let mut result = Vec::with_capacity(ranges.len());
93
94            for range in ranges.into_iter() {
95                let data = self.get_bytes(range).await?;
96                result.push(data);
97            }
98
99            Ok(result)
100        }
101        .boxed()
102    }
103
104    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
105    ///
106    /// This is an asynchronous operation as it may involve reading the file
107    /// footer and potentially other metadata from disk or a remote source.
108    ///
109    /// Reading data from Parquet requires the metadata to understand the
110    /// schema, row groups, and location of pages within the file. This metadata
111    /// is stored primarily in the footer of the Parquet file, and can be read using
112    /// [`ParquetMetaDataReader`].
113    ///
114    /// However, implementations can significantly speed up reading Parquet by
115    /// supplying cached metadata or pre-fetched metadata via this API.
116    ///
117    /// # Parameters
118    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
119    ///   and other options that affect how the metadata is read.
120    fn get_metadata<'a>(
121        &'a mut self,
122        options: Option<&'a ArrowReaderOptions>,
123    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
127impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129        self.as_mut().get_bytes(range)
130    }
131
132    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133        self.as_mut().get_byte_ranges(ranges)
134    }
135
136    fn get_metadata<'a>(
137        &'a mut self,
138        options: Option<&'a ArrowReaderOptions>,
139    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140        self.as_mut().get_metadata(options)
141    }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146        async move {
147            self.seek(SeekFrom::End(-(suffix as i64))).await?;
148            let mut buf = Vec::with_capacity(suffix);
149            self.take(suffix as _).read_to_end(&mut buf).await?;
150            Ok(buf.into())
151        }
152        .boxed()
153    }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158        async move {
159            self.seek(SeekFrom::Start(range.start)).await?;
160
161            let to_read = range.end - range.start;
162            let mut buffer = Vec::with_capacity(to_read.try_into()?);
163            let read = self.take(to_read).read_to_end(&mut buffer).await?;
164            if read as u64 != to_read {
165                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166            }
167
168            Ok(buffer.into())
169        }
170        .boxed()
171    }
172
173    fn get_metadata<'a>(
174        &'a mut self,
175        options: Option<&'a ArrowReaderOptions>,
176    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177        async move {
178            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180            );
181
182            #[cfg(feature = "encryption")]
183            let metadata_reader = metadata_reader.with_decryption_properties(
184                options.and_then(|o| o.file_decryption_properties.as_ref()),
185            );
186
187            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188            Ok(Arc::new(parquet_metadata))
189        }
190        .boxed()
191    }
192}
193
194impl ArrowReaderMetadata {
195    /// Returns a new [`ArrowReaderMetadata`] for this builder
196    ///
197    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
198    pub async fn load_async<T: AsyncFileReader>(
199        input: &mut T,
200        options: ArrowReaderOptions,
201    ) -> Result<Self> {
202        let metadata = input.get_metadata(Some(&options)).await?;
203        Self::try_new(metadata, options)
204    }
205}
206
207#[doc(hidden)]
208/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
209///
210/// Allows sharing the same builder for both the sync and async versions, whilst also not
211/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
212pub struct AsyncReader<T>(T);
213
214/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
215///
216/// This can be used to decode a Parquet file in streaming fashion (without
217/// downloading the whole file at once) from a remote source, such as an object store.
218///
219/// This builder handles reading the parquet file metadata, allowing consumers
220/// to use this information to select what specific columns, row groups, etc.
221/// they wish to be read by the resulting stream.
222///
223/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
224///
225/// See [`ArrowReaderBuilder`] for additional member functions
226pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
230    /// specified source.
231    ///
232    /// # Example
233    /// ```
234    /// # #[tokio::main(flavor="current_thread")]
235    /// # async fn main() {
236    /// #
237    /// # use arrow_array::RecordBatch;
238    /// # use arrow::util::pretty::pretty_format_batches;
239    /// # use futures::TryStreamExt;
240    /// #
241    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
242    /// #
243    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
244    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
245    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
246    /// #     assert_eq!(
247    /// #          &actual_lines, expected_lines,
248    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
249    /// #          expected_lines, actual_lines
250    /// #      );
251    /// #  }
252    /// #
253    /// # let testdata = arrow::util::test_util::parquet_test_data();
254    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
255    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
256    /// // another async I/O reader such as a reader from an object store.
257    /// let file = tokio::fs::File::open(path).await.unwrap();
258    ///
259    /// // Configure options for reading from the async source
260    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
261    ///     .await
262    ///     .unwrap();
263    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
264    /// // a stream that can be used to incrementally read the data in batches
265    /// let stream = builder.build().unwrap();
266    /// // In this example, we collect the stream into a Vec<RecordBatch>
267    /// // but real applications would likely process the batches as they are read
268    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
269    /// // Demonstrate the results are as expected
270    /// assert_batches_eq(
271    ///     &results,
272    ///     &[
273    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
274    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
275    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
276    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
277    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
278    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
279    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
280    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
281    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
282    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
283    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
284    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
285    ///      ],
286    ///  );
287    /// # }
288    /// ```
289    ///
290    /// # Example configuring options and reading metadata
291    ///
292    /// There are many options that control the behavior of the reader, such as
293    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
294    ///
295    /// ```
296    /// # #[tokio::main(flavor="current_thread")]
297    /// # async fn main() {
298    /// #
299    /// # use arrow_array::RecordBatch;
300    /// # use arrow::util::pretty::pretty_format_batches;
301    /// # use futures::TryStreamExt;
302    /// #
303    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
304    /// #
305    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
306    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
307    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
308    /// #     assert_eq!(
309    /// #          &actual_lines, expected_lines,
310    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
311    /// #          expected_lines, actual_lines
312    /// #      );
313    /// #  }
314    /// #
315    /// # let testdata = arrow::util::test_util::parquet_test_data();
316    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
317    /// // As before, use tokio::fs::File to read data using an async I/O.
318    /// let file = tokio::fs::File::open(path).await.unwrap();
319    ///
320    /// // Configure options for reading from the async source, in this case we set the batch size
321    /// // to 3 which produces 3 rows at a time.
322    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
323    ///     .await
324    ///     .unwrap()
325    ///     .with_batch_size(3);
326    ///
327    /// // We can also read the metadata to inspect the schema and other metadata
328    /// // before actually reading the data
329    /// let file_metadata = builder.metadata().file_metadata();
330    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
331    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
332    ///
333    /// let stream = builder.with_projection(mask).build().unwrap();
334    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
335    /// // Print out the results
336    /// assert_batches_eq(
337    ///     &results,
338    ///     &[
339    ///         "+----------+-------------+-----------+",
340    ///         "| bool_col | tinyint_col | float_col |",
341    ///         "+----------+-------------+-----------+",
342    ///         "| true     | 0           | 0.0       |",
343    ///         "| false    | 1           | 1.1       |",
344    ///         "| true     | 0           | 0.0       |",
345    ///         "| false    | 1           | 1.1       |",
346    ///         "| true     | 0           | 0.0       |",
347    ///         "| false    | 1           | 1.1       |",
348    ///         "| true     | 0           | 0.0       |",
349    ///         "| false    | 1           | 1.1       |",
350    ///         "+----------+-------------+-----------+",
351    ///      ],
352    ///  );
353    ///
354    /// // The results has 8 rows, so since we set the batch size to 3, we expect
355    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
356    /// assert_eq!(results.len(), 3);
357    /// # }
358    /// ```
359    pub async fn new(input: T) -> Result<Self> {
360        Self::new_with_options(input, Default::default()).await
361    }
362
363    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
364    /// and [`ArrowReaderOptions`].
365    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367        Ok(Self::new_with_metadata(input, metadata))
368    }
369
370    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
371    ///
372    /// This allows loading metadata once and using it to create multiple builders with
373    /// potentially different settings, that can be read in parallel.
374    ///
375    /// # Example of reading from multiple streams in parallel
376    ///
377    /// ```
378    /// # use std::fs::metadata;
379    /// # use std::sync::Arc;
380    /// # use bytes::Bytes;
381    /// # use arrow_array::{Int32Array, RecordBatch};
382    /// # use arrow_schema::{DataType, Field, Schema};
383    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
384    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
385    /// # use tempfile::tempfile;
386    /// # use futures::StreamExt;
387    /// # #[tokio::main(flavor="current_thread")]
388    /// # async fn main() {
389    /// #
390    /// # let mut file = tempfile().unwrap();
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// // open file with parquet data
397    /// let mut file = tokio::fs::File::from_std(file);
398    /// // load metadata once
399    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
400    /// // create two readers, a and b, from the same underlying file
401    /// // without reading the metadata again
402    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
403    ///     file.try_clone().await.unwrap(),
404    ///     meta.clone()
405    /// ).build().unwrap();
406    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
407    ///
408    /// // Can read batches from both readers in parallel
409    /// assert_eq!(
410    ///   a.next().await.unwrap().unwrap(),
411    ///   b.next().await.unwrap().unwrap(),
412    /// );
413    /// # }
414    /// ```
415    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416        Self::new_builder(AsyncReader(input), metadata)
417    }
418
419    /// Read bloom filter for a column in a row group
420    ///
421    /// Returns `None` if the column does not have a bloom filter
422    ///
423    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
424    pub async fn get_row_group_column_bloom_filter(
425        &mut self,
426        row_group_idx: usize,
427        column_idx: usize,
428    ) -> Result<Option<Sbbf>> {
429        let metadata = self.metadata.row_group(row_group_idx);
430        let column_metadata = metadata.column(column_idx);
431
432        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433            offset
434                .try_into()
435                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436        } else {
437            return Ok(None);
438        };
439
440        let buffer = match column_metadata.bloom_filter_length() {
441            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442            None => self
443                .input
444                .0
445                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446        }
447        .await?;
448
449        let (header, bitset_offset) =
450            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452        match header.algorithm {
453            BloomFilterAlgorithm::BLOCK(_) => {
454                // this match exists to future proof the singleton algorithm enum
455            }
456        }
457        match header.compression {
458            BloomFilterCompression::UNCOMPRESSED(_) => {
459                // this match exists to future proof the singleton compression enum
460            }
461        }
462        match header.hash {
463            BloomFilterHash::XXHASH(_) => {
464                // this match exists to future proof the singleton hash enum
465            }
466        }
467
468        let bitset = match column_metadata.bloom_filter_length() {
469            Some(_) => buffer.slice(
470                (TryInto::<usize>::try_into(bitset_offset).unwrap()
471                    - TryInto::<usize>::try_into(offset).unwrap())..,
472            ),
473            None => {
474                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475                    ParquetError::General("Bloom filter length is invalid".to_string())
476                })?;
477                self.input
478                    .0
479                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
480                    .await?
481            }
482        };
483        Ok(Some(Sbbf::new(&bitset)))
484    }
485
486    /// Build a new [`ParquetRecordBatchStream`]
487    ///
488    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
489    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490        let num_row_groups = self.metadata.row_groups().len();
491
492        let row_groups = match self.row_groups {
493            Some(row_groups) => {
494                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495                    return Err(general_err!(
496                        "row group {} out of bounds 0..{}",
497                        col,
498                        num_row_groups
499                    ));
500                }
501                row_groups.into()
502            }
503            None => (0..self.metadata.row_groups().len()).collect(),
504        };
505
506        // Try to avoid allocate large buffer
507        let batch_size = self
508            .batch_size
509            .min(self.metadata.file_metadata().num_rows() as usize);
510        let reader_factory = ReaderFactory {
511            input: self.input.0,
512            filter: self.filter,
513            metadata: self.metadata.clone(),
514            fields: self.fields,
515            limit: self.limit,
516            offset: self.offset,
517            metrics: self.metrics,
518            max_predicate_cache_size: self.max_predicate_cache_size,
519        };
520
521        // Ensure schema of ParquetRecordBatchStream respects projection, and does
522        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
523        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524            Some(DataType::Struct(fields)) => {
525                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526            }
527            None => Fields::empty(),
528            _ => unreachable!("Must be Struct for root type"),
529        };
530        let schema = Arc::new(Schema::new(projected_fields));
531
532        Ok(ParquetRecordBatchStream {
533            metadata: self.metadata,
534            batch_size,
535            row_groups,
536            projection: self.projection,
537            selection: self.selection,
538            schema,
539            reader_factory: Some(reader_factory),
540            state: StreamState::Init,
541        })
542    }
543}
544
545/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
546///
547/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
548/// offset), returns `None` for the reader.
549type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
552/// [`ParquetRecordBatchReader`]
553struct ReaderFactory<T> {
554    metadata: Arc<ParquetMetaData>,
555
556    /// Top level parquet schema
557    fields: Option<Arc<ParquetField>>,
558
559    input: T,
560
561    /// Optional filter
562    filter: Option<RowFilter>,
563
564    /// Limit to apply to remaining row groups.  
565    limit: Option<usize>,
566
567    /// Offset to apply to the next
568    offset: Option<usize>,
569
570    /// Metrics
571    metrics: ArrowReaderMetrics,
572
573    /// Maximum size of the predicate cache
574    max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579    T: AsyncFileReader + Send,
580{
581    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
582    ///
583    /// Updates the `limit` and `offset` of the reader factory
584    ///
585    /// Note: this captures self so that the resulting future has a static lifetime
586    async fn read_row_group(
587        mut self,
588        row_group_idx: usize,
589        selection: Option<RowSelection>,
590        projection: ProjectionMask,
591        batch_size: usize,
592    ) -> ReadResult<T> {
593        // TODO: calling build_array multiple times is wasteful
594
595        let meta = self.metadata.row_group(row_group_idx);
596        let offset_index = self
597            .metadata
598            .offset_index()
599            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
600            .filter(|index| !index.is_empty())
601            .map(|x| x[row_group_idx].as_slice());
602
603        // Reuse columns that are selected and used by the filters
604        let cache_projection = match self.compute_cache_projection(&projection) {
605            Some(projection) => projection,
606            None => ProjectionMask::none(meta.columns().len()),
607        };
608        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609            batch_size,
610            self.max_predicate_cache_size,
611        )));
612
613        let mut row_group = InMemoryRowGroup {
614            // schema: meta.schema_descr_ptr(),
615            row_count: meta.num_rows() as usize,
616            column_chunks: vec![None; meta.columns().len()],
617            offset_index,
618            row_group_idx,
619            metadata: self.metadata.as_ref(),
620        };
621
622        let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
623
624        let filter = self.filter.as_mut();
625        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627        // Update selection based on any filters
628        if let Some(filter) = filter {
629            let cache_options = cache_options_builder.clone().producer();
630
631            for predicate in filter.predicates.iter_mut() {
632                if !plan_builder.selects_any() {
633                    return Ok((self, None)); // ruled out entire row group
634                }
635
636                // (pre) Fetch only the columns that are selected by the predicate
637                let selection = plan_builder.selection();
638                // Fetch predicate columns; expand selection only for cached predicate columns
639                let cache_mask = Some(&cache_projection);
640                row_group
641                    .fetch(
642                        &mut self.input,
643                        predicate.projection(),
644                        selection,
645                        batch_size,
646                        cache_mask,
647                    )
648                    .await?;
649
650                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651                    .with_cache_options(Some(&cache_options))
652                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655            }
656        }
657
658        // Compute the number of rows in the selection before applying limit and offset
659        let rows_before = plan_builder
660            .num_rows_selected()
661            .unwrap_or(row_group.row_count);
662
663        if rows_before == 0 {
664            return Ok((self, None)); // ruled out entire row group
665        }
666
667        // Apply any limit and offset
668        let plan_builder = plan_builder
669            .limited(row_group.row_count)
670            .with_offset(self.offset)
671            .with_limit(self.limit)
672            .build_limited();
673
674        let rows_after = plan_builder
675            .num_rows_selected()
676            .unwrap_or(row_group.row_count);
677
678        // Update running offset and limit for after the current row group is read
679        if let Some(offset) = &mut self.offset {
680            // Reduction is either because of offset or limit, as limit is applied
681            // after offset has been "exhausted" can just use saturating sub here
682            *offset = offset.saturating_sub(rows_before - rows_after)
683        }
684
685        if rows_after == 0 {
686            return Ok((self, None)); // ruled out entire row group
687        }
688
689        if let Some(limit) = &mut self.limit {
690            *limit -= rows_after;
691        }
692        // fetch the pages needed for decoding
693        row_group
694            // Final projection fetch shouldn't expand selection for cache; pass None
695            .fetch(
696                &mut self.input,
697                &projection,
698                plan_builder.selection(),
699                batch_size,
700                None,
701            )
702            .await?;
703
704        let plan = plan_builder.build();
705
706        let cache_options = cache_options_builder.consumer();
707        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708            .with_cache_options(Some(&cache_options))
709            .build_array_reader(self.fields.as_deref(), &projection)?;
710
711        let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713        Ok((self, Some(reader)))
714    }
715
716    /// Compute which columns are used in filters and the final (output) projection
717    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718        let filters = self.filter.as_ref()?;
719        let mut cache_projection = filters.predicates.first()?.projection().clone();
720        for predicate in filters.predicates.iter() {
721            cache_projection.union(predicate.projection());
722        }
723        cache_projection.intersect(projection);
724        self.exclude_nested_columns_from_cache(&cache_projection)
725    }
726
727    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
728    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
729        let schema = self.metadata.file_metadata().schema_descr();
730        let num_leaves = schema.num_columns();
731
732        // Count how many leaves each root column has
733        let num_roots = schema.root_schema().get_fields().len();
734        let mut root_leaf_counts = vec![0usize; num_roots];
735        for leaf_idx in 0..num_leaves {
736            let root_idx = schema.get_column_root_idx(leaf_idx);
737            root_leaf_counts[root_idx] += 1;
738        }
739
740        // Keep only leaves whose root has exactly one leaf (non-nested)
741        let mut included_leaves = Vec::new();
742        for leaf_idx in 0..num_leaves {
743            if mask.leaf_included(leaf_idx) {
744                let root_idx = schema.get_column_root_idx(leaf_idx);
745                if root_leaf_counts[root_idx] == 1 {
746                    included_leaves.push(leaf_idx);
747                }
748            }
749        }
750
751        if included_leaves.is_empty() {
752            None
753        } else {
754            Some(ProjectionMask::leaves(schema, included_leaves))
755        }
756    }
757}
758
759enum StreamState<T> {
760    /// At the start of a new row group, or the end of the parquet stream
761    Init,
762    /// Decoding a batch
763    Decoding(ParquetRecordBatchReader),
764    /// Reading data from input
765    Reading(BoxFuture<'static, ReadResult<T>>),
766    /// Error
767    Error,
768}
769
770impl<T> std::fmt::Debug for StreamState<T> {
771    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772        match self {
773            StreamState::Init => write!(f, "StreamState::Init"),
774            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
775            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
776            StreamState::Error => write!(f, "StreamState::Error"),
777        }
778    }
779}
780
781/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
782///
783/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
784/// allowing users to decode record batches separately from I/O.
785///
786/// # I/O Buffering
787///
788/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
789/// (projection + filtering, etc) and decodes the rows from those buffered pages.
790///
791/// For example, if all rows and columns are selected, the entire row group is
792/// buffered in memory during decode. This minimizes the number of IO operations
793/// required, which is especially important for object stores, where IO operations
794/// have latencies in the hundreds of milliseconds
795///
796///
797/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
798pub struct ParquetRecordBatchStream<T> {
799    metadata: Arc<ParquetMetaData>,
800
801    schema: SchemaRef,
802
803    row_groups: VecDeque<usize>,
804
805    projection: ProjectionMask,
806
807    batch_size: usize,
808
809    selection: Option<RowSelection>,
810
811    /// This is an option so it can be moved into a future
812    reader_factory: Option<ReaderFactory<T>>,
813
814    state: StreamState<T>,
815}
816
817impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
818    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819        f.debug_struct("ParquetRecordBatchStream")
820            .field("metadata", &self.metadata)
821            .field("schema", &self.schema)
822            .field("batch_size", &self.batch_size)
823            .field("projection", &self.projection)
824            .field("state", &self.state)
825            .finish()
826    }
827}
828
829impl<T> ParquetRecordBatchStream<T> {
830    /// Returns the projected [`SchemaRef`] for reading the parquet file.
831    ///
832    /// Note that the schema metadata will be stripped here. See
833    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
834    pub fn schema(&self) -> &SchemaRef {
835        &self.schema
836    }
837}
838
839impl<T> ParquetRecordBatchStream<T>
840where
841    T: AsyncFileReader + Unpin + Send + 'static,
842{
843    /// Fetches the next row group from the stream.
844    ///
845    /// Users can continue to call this function to get row groups and decode them concurrently.
846    ///
847    /// ## Notes
848    ///
849    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
850    ///
851    /// ## Returns
852    ///
853    /// - `Ok(None)` if the stream has ended.
854    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
855    /// - `Ok(Some(reader))` which holds all the data for the row group.
856    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
857        loop {
858            match &mut self.state {
859                StreamState::Decoding(_) | StreamState::Reading(_) => {
860                    return Err(ParquetError::General(
861                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
862                    ))
863                }
864                StreamState::Init => {
865                    let row_group_idx = match self.row_groups.pop_front() {
866                        Some(idx) => idx,
867                        None => return Ok(None),
868                    };
869
870                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
871
872                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
873
874                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
875
876                    let (reader_factory, maybe_reader) = reader_factory
877                        .read_row_group(
878                            row_group_idx,
879                            selection,
880                            self.projection.clone(),
881                            self.batch_size,
882                        )
883                        .await
884                        .inspect_err(|_| {
885                            self.state = StreamState::Error;
886                        })?;
887                    self.reader_factory = Some(reader_factory);
888
889                    if let Some(reader) = maybe_reader {
890                        return Ok(Some(reader));
891                    } else {
892                        // All rows skipped, read next row group
893                        continue;
894                    }
895                }
896                StreamState::Error => return Ok(None), // Ends the stream as error happens.
897            }
898        }
899    }
900}
901
902impl<T> Stream for ParquetRecordBatchStream<T>
903where
904    T: AsyncFileReader + Unpin + Send + 'static,
905{
906    type Item = Result<RecordBatch>;
907
908    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
909        loop {
910            match &mut self.state {
911                StreamState::Decoding(batch_reader) => match batch_reader.next() {
912                    Some(Ok(batch)) => {
913                        return Poll::Ready(Some(Ok(batch)));
914                    }
915                    Some(Err(e)) => {
916                        self.state = StreamState::Error;
917                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
918                    }
919                    None => self.state = StreamState::Init,
920                },
921                StreamState::Init => {
922                    let row_group_idx = match self.row_groups.pop_front() {
923                        Some(idx) => idx,
924                        None => return Poll::Ready(None),
925                    };
926
927                    let reader = self.reader_factory.take().expect("lost reader factory");
928
929                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
930
931                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
932
933                    let fut = reader
934                        .read_row_group(
935                            row_group_idx,
936                            selection,
937                            self.projection.clone(),
938                            self.batch_size,
939                        )
940                        .boxed();
941
942                    self.state = StreamState::Reading(fut)
943                }
944                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
945                    Ok((reader_factory, maybe_reader)) => {
946                        self.reader_factory = Some(reader_factory);
947                        match maybe_reader {
948                            // Read records from [`ParquetRecordBatchReader`]
949                            Some(reader) => self.state = StreamState::Decoding(reader),
950                            // All rows skipped, read next row group
951                            None => self.state = StreamState::Init,
952                        }
953                    }
954                    Err(e) => {
955                        self.state = StreamState::Error;
956                        return Poll::Ready(Some(Err(e)));
957                    }
958                },
959                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
960            }
961        }
962    }
963}
964
965/// An in-memory collection of column chunks
966struct InMemoryRowGroup<'a> {
967    offset_index: Option<&'a [OffsetIndexMetaData]>,
968    /// Column chunks for this row group
969    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
970    row_count: usize,
971    row_group_idx: usize,
972    metadata: &'a ParquetMetaData,
973}
974
975impl InMemoryRowGroup<'_> {
976    /// Fetches any additional column data specified in `projection` that is not already
977    /// present in `self.column_chunks`.
978    ///
979    /// If `selection` is provided, only the pages required for the selection
980    /// are fetched. Otherwise, all pages are fetched.
981    async fn fetch<T: AsyncFileReader + Send>(
982        &mut self,
983        input: &mut T,
984        projection: &ProjectionMask,
985        selection: Option<&RowSelection>,
986        batch_size: usize,
987        cache_mask: Option<&ProjectionMask>,
988    ) -> Result<()> {
989        let metadata = self.metadata.row_group(self.row_group_idx);
990        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
991            let expanded_selection =
992                selection.expand_to_batch_boundaries(batch_size, self.row_count);
993            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
994            // `RowSelection`
995            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
996
997            let fetch_ranges = self
998                .column_chunks
999                .iter()
1000                .zip(metadata.columns())
1001                .enumerate()
1002                .filter(|&(idx, (chunk, _chunk_meta))| {
1003                    chunk.is_none() && projection.leaf_included(idx)
1004                })
1005                .flat_map(|(idx, (_chunk, chunk_meta))| {
1006                    // If the first page does not start at the beginning of the column,
1007                    // then we need to also fetch a dictionary page.
1008                    let mut ranges: Vec<Range<u64>> = vec![];
1009                    let (start, _len) = chunk_meta.byte_range();
1010                    match offset_index[idx].page_locations.first() {
1011                        Some(first) if first.offset as u64 != start => {
1012                            ranges.push(start..first.offset as u64);
1013                        }
1014                        _ => (),
1015                    }
1016
1017                    // Expand selection to batch boundaries only for cached columns
1018                    let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1019                    if use_expanded {
1020                        ranges.extend(
1021                            expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1022                        );
1023                    } else {
1024                        ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1025                    }
1026                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1027
1028                    ranges
1029                })
1030                .collect();
1031
1032            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1033            let mut page_start_offsets = page_start_offsets.into_iter();
1034
1035            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1036                if chunk.is_some() || !projection.leaf_included(idx) {
1037                    continue;
1038                }
1039
1040                if let Some(offsets) = page_start_offsets.next() {
1041                    let mut chunks = Vec::with_capacity(offsets.len());
1042                    for _ in 0..offsets.len() {
1043                        chunks.push(chunk_data.next().unwrap());
1044                    }
1045
1046                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1047                        length: metadata.column(idx).byte_range().1 as usize,
1048                        data: offsets
1049                            .into_iter()
1050                            .map(|x| x as usize)
1051                            .zip(chunks.into_iter())
1052                            .collect(),
1053                    }))
1054                }
1055            }
1056        } else {
1057            let fetch_ranges = self
1058                .column_chunks
1059                .iter()
1060                .enumerate()
1061                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1062                .map(|(idx, _chunk)| {
1063                    let column = metadata.column(idx);
1064                    let (start, length) = column.byte_range();
1065                    start..(start + length)
1066                })
1067                .collect();
1068
1069            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1070
1071            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1072                if chunk.is_some() || !projection.leaf_included(idx) {
1073                    continue;
1074                }
1075
1076                if let Some(data) = chunk_data.next() {
1077                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
1078                        offset: metadata.column(idx).byte_range().0 as usize,
1079                        data,
1080                    }));
1081                }
1082            }
1083        }
1084
1085        Ok(())
1086    }
1087}
1088
1089impl RowGroups for InMemoryRowGroup<'_> {
1090    fn num_rows(&self) -> usize {
1091        self.row_count
1092    }
1093
1094    /// Return chunks for column i
1095    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1096        match &self.column_chunks[i] {
1097            None => Err(ParquetError::General(format!(
1098                "Invalid column index {i}, column was not fetched"
1099            ))),
1100            Some(data) => {
1101                let page_locations = self
1102                    .offset_index
1103                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1104                    .filter(|index| !index.is_empty())
1105                    .map(|index| index[i].page_locations.clone());
1106                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1107                let page_reader = SerializedPageReader::new(
1108                    data.clone(),
1109                    column_chunk_metadata,
1110                    self.row_count,
1111                    page_locations,
1112                )?;
1113                let page_reader = page_reader.add_crypto_context(
1114                    self.row_group_idx,
1115                    i,
1116                    self.metadata,
1117                    column_chunk_metadata,
1118                )?;
1119
1120                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1121
1122                Ok(Box::new(ColumnChunkIterator {
1123                    reader: Some(Ok(page_reader)),
1124                }))
1125            }
1126        }
1127    }
1128}
1129
1130/// An in-memory column chunk
1131#[derive(Clone)]
1132enum ColumnChunkData {
1133    /// Column chunk data representing only a subset of data pages
1134    Sparse {
1135        /// Length of the full column chunk
1136        length: usize,
1137        /// Subset of data pages included in this sparse chunk.
1138        ///
1139        /// Each element is a tuple of (page offset within file, page data).
1140        /// Each entry is a complete page and the list is ordered by offset.
1141        data: Vec<(usize, Bytes)>,
1142    },
1143    /// Full column chunk and the offset within the original file
1144    Dense { offset: usize, data: Bytes },
1145}
1146
1147impl ColumnChunkData {
1148    /// Return the data for this column chunk at the given offset
1149    fn get(&self, start: u64) -> Result<Bytes> {
1150        match &self {
1151            ColumnChunkData::Sparse { data, .. } => data
1152                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1153                .map(|idx| data[idx].1.clone())
1154                .map_err(|_| {
1155                    ParquetError::General(format!(
1156                        "Invalid offset in sparse column chunk data: {start}"
1157                    ))
1158                }),
1159            ColumnChunkData::Dense { offset, data } => {
1160                let start = start as usize - *offset;
1161                Ok(data.slice(start..))
1162            }
1163        }
1164    }
1165}
1166
1167impl Length for ColumnChunkData {
1168    /// Return the total length of the full column chunk
1169    fn len(&self) -> u64 {
1170        match &self {
1171            ColumnChunkData::Sparse { length, .. } => *length as u64,
1172            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1173        }
1174    }
1175}
1176
1177impl ChunkReader for ColumnChunkData {
1178    type T = bytes::buf::Reader<Bytes>;
1179
1180    fn get_read(&self, start: u64) -> Result<Self::T> {
1181        Ok(self.get(start)?.reader())
1182    }
1183
1184    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1185        Ok(self.get(start)?.slice(..length))
1186    }
1187}
1188
1189/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1190struct ColumnChunkIterator {
1191    reader: Option<Result<Box<dyn PageReader>>>,
1192}
1193
1194impl Iterator for ColumnChunkIterator {
1195    type Item = Result<Box<dyn PageReader>>;
1196
1197    fn next(&mut self) -> Option<Self::Item> {
1198        self.reader.take()
1199    }
1200}
1201
1202impl PageIterator for ColumnChunkIterator {}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207    use crate::arrow::arrow_reader::{
1208        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1209    };
1210    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1211    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1212    use crate::arrow::ArrowWriter;
1213    use crate::file::metadata::ParquetMetaDataReader;
1214    use crate::file::properties::WriterProperties;
1215    use arrow::compute::kernels::cmp::eq;
1216    use arrow::error::Result as ArrowResult;
1217    use arrow_array::builder::{ListBuilder, StringBuilder};
1218    use arrow_array::cast::AsArray;
1219    use arrow_array::types::Int32Type;
1220    use arrow_array::{
1221        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1222        StructArray, UInt64Array,
1223    };
1224    use arrow_schema::{DataType, Field, Schema};
1225    use futures::{StreamExt, TryStreamExt};
1226    use rand::{rng, Rng};
1227    use std::collections::HashMap;
1228    use std::sync::{Arc, Mutex};
1229    use tempfile::tempfile;
1230
1231    #[derive(Clone)]
1232    struct TestReader {
1233        data: Bytes,
1234        metadata: Option<Arc<ParquetMetaData>>,
1235        requests: Arc<Mutex<Vec<Range<usize>>>>,
1236    }
1237
1238    impl TestReader {
1239        fn new(data: Bytes) -> Self {
1240            Self {
1241                data,
1242                metadata: Default::default(),
1243                requests: Default::default(),
1244            }
1245        }
1246    }
1247
1248    impl AsyncFileReader for TestReader {
1249        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1250            let range = range.clone();
1251            self.requests
1252                .lock()
1253                .unwrap()
1254                .push(range.start as usize..range.end as usize);
1255            futures::future::ready(Ok(self
1256                .data
1257                .slice(range.start as usize..range.end as usize)))
1258            .boxed()
1259        }
1260
1261        fn get_metadata<'a>(
1262            &'a mut self,
1263            options: Option<&'a ArrowReaderOptions>,
1264        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1265            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1266                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1267            );
1268            self.metadata = Some(Arc::new(
1269                metadata_reader.parse_and_finish(&self.data).unwrap(),
1270            ));
1271            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1272        }
1273    }
1274
1275    #[tokio::test]
1276    async fn test_async_reader() {
1277        let testdata = arrow::util::test_util::parquet_test_data();
1278        let path = format!("{testdata}/alltypes_plain.parquet");
1279        let data = Bytes::from(std::fs::read(path).unwrap());
1280
1281        let async_reader = TestReader::new(data.clone());
1282
1283        let requests = async_reader.requests.clone();
1284        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1285            .await
1286            .unwrap();
1287
1288        let metadata = builder.metadata().clone();
1289        assert_eq!(metadata.num_row_groups(), 1);
1290
1291        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1292        let stream = builder
1293            .with_projection(mask.clone())
1294            .with_batch_size(1024)
1295            .build()
1296            .unwrap();
1297
1298        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1299
1300        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1301            .unwrap()
1302            .with_projection(mask)
1303            .with_batch_size(104)
1304            .build()
1305            .unwrap()
1306            .collect::<ArrowResult<Vec<_>>>()
1307            .unwrap();
1308
1309        assert_eq!(async_batches, sync_batches);
1310
1311        let requests = requests.lock().unwrap();
1312        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1313        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1314
1315        assert_eq!(
1316            &requests[..],
1317            &[
1318                offset_1 as usize..(offset_1 + length_1) as usize,
1319                offset_2 as usize..(offset_2 + length_2) as usize
1320            ]
1321        );
1322    }
1323
1324    #[tokio::test]
1325    async fn test_async_reader_with_next_row_group() {
1326        let testdata = arrow::util::test_util::parquet_test_data();
1327        let path = format!("{testdata}/alltypes_plain.parquet");
1328        let data = Bytes::from(std::fs::read(path).unwrap());
1329
1330        let async_reader = TestReader::new(data.clone());
1331
1332        let requests = async_reader.requests.clone();
1333        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1334            .await
1335            .unwrap();
1336
1337        let metadata = builder.metadata().clone();
1338        assert_eq!(metadata.num_row_groups(), 1);
1339
1340        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1341        let mut stream = builder
1342            .with_projection(mask.clone())
1343            .with_batch_size(1024)
1344            .build()
1345            .unwrap();
1346
1347        let mut readers = vec![];
1348        while let Some(reader) = stream.next_row_group().await.unwrap() {
1349            readers.push(reader);
1350        }
1351
1352        let async_batches: Vec<_> = readers
1353            .into_iter()
1354            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1355            .collect();
1356
1357        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1358            .unwrap()
1359            .with_projection(mask)
1360            .with_batch_size(104)
1361            .build()
1362            .unwrap()
1363            .collect::<ArrowResult<Vec<_>>>()
1364            .unwrap();
1365
1366        assert_eq!(async_batches, sync_batches);
1367
1368        let requests = requests.lock().unwrap();
1369        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1370        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1371
1372        assert_eq!(
1373            &requests[..],
1374            &[
1375                offset_1 as usize..(offset_1 + length_1) as usize,
1376                offset_2 as usize..(offset_2 + length_2) as usize
1377            ]
1378        );
1379    }
1380
1381    #[tokio::test]
1382    async fn test_async_reader_with_index() {
1383        let testdata = arrow::util::test_util::parquet_test_data();
1384        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1385        let data = Bytes::from(std::fs::read(path).unwrap());
1386
1387        let async_reader = TestReader::new(data.clone());
1388
1389        let options = ArrowReaderOptions::new().with_page_index(true);
1390        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1391            .await
1392            .unwrap();
1393
1394        // The builder should have page and offset indexes loaded now
1395        let metadata_with_index = builder.metadata();
1396        assert_eq!(metadata_with_index.num_row_groups(), 1);
1397
1398        // Check offset indexes are present for all columns
1399        let offset_index = metadata_with_index.offset_index().unwrap();
1400        let column_index = metadata_with_index.column_index().unwrap();
1401
1402        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1403        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1404
1405        let num_columns = metadata_with_index
1406            .file_metadata()
1407            .schema_descr()
1408            .num_columns();
1409
1410        // Check page indexes are present for all columns
1411        offset_index
1412            .iter()
1413            .for_each(|x| assert_eq!(x.len(), num_columns));
1414        column_index
1415            .iter()
1416            .for_each(|x| assert_eq!(x.len(), num_columns));
1417
1418        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1419        let stream = builder
1420            .with_projection(mask.clone())
1421            .with_batch_size(1024)
1422            .build()
1423            .unwrap();
1424
1425        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1426
1427        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1428            .unwrap()
1429            .with_projection(mask)
1430            .with_batch_size(1024)
1431            .build()
1432            .unwrap()
1433            .collect::<ArrowResult<Vec<_>>>()
1434            .unwrap();
1435
1436        assert_eq!(async_batches, sync_batches);
1437    }
1438
1439    #[tokio::test]
1440    async fn test_async_reader_with_limit() {
1441        let testdata = arrow::util::test_util::parquet_test_data();
1442        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1443        let data = Bytes::from(std::fs::read(path).unwrap());
1444
1445        let metadata = ParquetMetaDataReader::new()
1446            .parse_and_finish(&data)
1447            .unwrap();
1448        let metadata = Arc::new(metadata);
1449
1450        assert_eq!(metadata.num_row_groups(), 1);
1451
1452        let async_reader = TestReader::new(data.clone());
1453
1454        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1455            .await
1456            .unwrap();
1457
1458        assert_eq!(builder.metadata().num_row_groups(), 1);
1459
1460        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1461        let stream = builder
1462            .with_projection(mask.clone())
1463            .with_batch_size(1024)
1464            .with_limit(1)
1465            .build()
1466            .unwrap();
1467
1468        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1469
1470        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1471            .unwrap()
1472            .with_projection(mask)
1473            .with_batch_size(1024)
1474            .with_limit(1)
1475            .build()
1476            .unwrap()
1477            .collect::<ArrowResult<Vec<_>>>()
1478            .unwrap();
1479
1480        assert_eq!(async_batches, sync_batches);
1481    }
1482
1483    #[tokio::test]
1484    async fn test_async_reader_skip_pages() {
1485        let testdata = arrow::util::test_util::parquet_test_data();
1486        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1487        let data = Bytes::from(std::fs::read(path).unwrap());
1488
1489        let async_reader = TestReader::new(data.clone());
1490
1491        let options = ArrowReaderOptions::new().with_page_index(true);
1492        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1493            .await
1494            .unwrap();
1495
1496        assert_eq!(builder.metadata().num_row_groups(), 1);
1497
1498        let selection = RowSelection::from(vec![
1499            RowSelector::skip(21),   // Skip first page
1500            RowSelector::select(21), // Select page to boundary
1501            RowSelector::skip(41),   // Skip multiple pages
1502            RowSelector::select(41), // Select multiple pages
1503            RowSelector::skip(25),   // Skip page across boundary
1504            RowSelector::select(25), // Select across page boundary
1505            RowSelector::skip(7116), // Skip to final page boundary
1506            RowSelector::select(10), // Select final page
1507        ]);
1508
1509        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1510
1511        let stream = builder
1512            .with_projection(mask.clone())
1513            .with_row_selection(selection.clone())
1514            .build()
1515            .expect("building stream");
1516
1517        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1518
1519        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1520            .unwrap()
1521            .with_projection(mask)
1522            .with_batch_size(1024)
1523            .with_row_selection(selection)
1524            .build()
1525            .unwrap()
1526            .collect::<ArrowResult<Vec<_>>>()
1527            .unwrap();
1528
1529        assert_eq!(async_batches, sync_batches);
1530    }
1531
1532    #[tokio::test]
1533    async fn test_fuzz_async_reader_selection() {
1534        let testdata = arrow::util::test_util::parquet_test_data();
1535        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1536        let data = Bytes::from(std::fs::read(path).unwrap());
1537
1538        let mut rand = rng();
1539
1540        for _ in 0..100 {
1541            let mut expected_rows = 0;
1542            let mut total_rows = 0;
1543            let mut skip = false;
1544            let mut selectors = vec![];
1545
1546            while total_rows < 7300 {
1547                let row_count: usize = rand.random_range(1..100);
1548
1549                let row_count = row_count.min(7300 - total_rows);
1550
1551                selectors.push(RowSelector { row_count, skip });
1552
1553                total_rows += row_count;
1554                if !skip {
1555                    expected_rows += row_count;
1556                }
1557
1558                skip = !skip;
1559            }
1560
1561            let selection = RowSelection::from(selectors);
1562
1563            let async_reader = TestReader::new(data.clone());
1564
1565            let options = ArrowReaderOptions::new().with_page_index(true);
1566            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1567                .await
1568                .unwrap();
1569
1570            assert_eq!(builder.metadata().num_row_groups(), 1);
1571
1572            let col_idx: usize = rand.random_range(0..13);
1573            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1574
1575            let stream = builder
1576                .with_projection(mask.clone())
1577                .with_row_selection(selection.clone())
1578                .build()
1579                .expect("building stream");
1580
1581            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1582
1583            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1584
1585            assert_eq!(actual_rows, expected_rows);
1586        }
1587    }
1588
1589    #[tokio::test]
1590    async fn test_async_reader_zero_row_selector() {
1591        //See https://github.com/apache/arrow-rs/issues/2669
1592        let testdata = arrow::util::test_util::parquet_test_data();
1593        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1594        let data = Bytes::from(std::fs::read(path).unwrap());
1595
1596        let mut rand = rng();
1597
1598        let mut expected_rows = 0;
1599        let mut total_rows = 0;
1600        let mut skip = false;
1601        let mut selectors = vec![];
1602
1603        selectors.push(RowSelector {
1604            row_count: 0,
1605            skip: false,
1606        });
1607
1608        while total_rows < 7300 {
1609            let row_count: usize = rand.random_range(1..100);
1610
1611            let row_count = row_count.min(7300 - total_rows);
1612
1613            selectors.push(RowSelector { row_count, skip });
1614
1615            total_rows += row_count;
1616            if !skip {
1617                expected_rows += row_count;
1618            }
1619
1620            skip = !skip;
1621        }
1622
1623        let selection = RowSelection::from(selectors);
1624
1625        let async_reader = TestReader::new(data.clone());
1626
1627        let options = ArrowReaderOptions::new().with_page_index(true);
1628        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1629            .await
1630            .unwrap();
1631
1632        assert_eq!(builder.metadata().num_row_groups(), 1);
1633
1634        let col_idx: usize = rand.random_range(0..13);
1635        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1636
1637        let stream = builder
1638            .with_projection(mask.clone())
1639            .with_row_selection(selection.clone())
1640            .build()
1641            .expect("building stream");
1642
1643        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1644
1645        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1646
1647        assert_eq!(actual_rows, expected_rows);
1648    }
1649
1650    #[tokio::test]
1651    async fn test_row_filter() {
1652        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1653        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1654        let data = RecordBatch::try_from_iter([
1655            ("a", Arc::new(a) as ArrayRef),
1656            ("b", Arc::new(b) as ArrayRef),
1657        ])
1658        .unwrap();
1659
1660        let mut buf = Vec::with_capacity(1024);
1661        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1662        writer.write(&data).unwrap();
1663        writer.close().unwrap();
1664
1665        let data: Bytes = buf.into();
1666        let metadata = ParquetMetaDataReader::new()
1667            .parse_and_finish(&data)
1668            .unwrap();
1669        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1670
1671        let test = TestReader::new(data);
1672        let requests = test.requests.clone();
1673
1674        let a_scalar = StringArray::from_iter_values(["b"]);
1675        let a_filter = ArrowPredicateFn::new(
1676            ProjectionMask::leaves(&parquet_schema, vec![0]),
1677            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1678        );
1679
1680        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1681
1682        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1683        let stream = ParquetRecordBatchStreamBuilder::new(test)
1684            .await
1685            .unwrap()
1686            .with_projection(mask.clone())
1687            .with_batch_size(1024)
1688            .with_row_filter(filter)
1689            .build()
1690            .unwrap();
1691
1692        let batches: Vec<_> = stream.try_collect().await.unwrap();
1693        assert_eq!(batches.len(), 1);
1694
1695        let batch = &batches[0];
1696        assert_eq!(batch.num_columns(), 2);
1697
1698        // Filter should have kept only rows with "b" in column 0
1699        assert_eq!(
1700            batch.column(0).as_ref(),
1701            &StringArray::from_iter_values(["b", "b", "b"])
1702        );
1703        assert_eq!(
1704            batch.column(1).as_ref(),
1705            &StringArray::from_iter_values(["2", "3", "4"])
1706        );
1707
1708        // Should only have made 2 requests:
1709        // * First request fetches data for evaluating the predicate
1710        // * Second request fetches data for evaluating the projection
1711        assert_eq!(requests.lock().unwrap().len(), 2);
1712    }
1713
1714    #[tokio::test]
1715    async fn test_two_row_filters() {
1716        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1717        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1718        let c = Int32Array::from_iter(0..6);
1719        let data = RecordBatch::try_from_iter([
1720            ("a", Arc::new(a) as ArrayRef),
1721            ("b", Arc::new(b) as ArrayRef),
1722            ("c", Arc::new(c) as ArrayRef),
1723        ])
1724        .unwrap();
1725
1726        let mut buf = Vec::with_capacity(1024);
1727        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1728        writer.write(&data).unwrap();
1729        writer.close().unwrap();
1730
1731        let data: Bytes = buf.into();
1732        let metadata = ParquetMetaDataReader::new()
1733            .parse_and_finish(&data)
1734            .unwrap();
1735        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1736
1737        let test = TestReader::new(data);
1738        let requests = test.requests.clone();
1739
1740        let a_scalar = StringArray::from_iter_values(["b"]);
1741        let a_filter = ArrowPredicateFn::new(
1742            ProjectionMask::leaves(&parquet_schema, vec![0]),
1743            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1744        );
1745
1746        let b_scalar = StringArray::from_iter_values(["4"]);
1747        let b_filter = ArrowPredicateFn::new(
1748            ProjectionMask::leaves(&parquet_schema, vec![1]),
1749            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1750        );
1751
1752        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1753
1754        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1755        let stream = ParquetRecordBatchStreamBuilder::new(test)
1756            .await
1757            .unwrap()
1758            .with_projection(mask.clone())
1759            .with_batch_size(1024)
1760            .with_row_filter(filter)
1761            .build()
1762            .unwrap();
1763
1764        let batches: Vec<_> = stream.try_collect().await.unwrap();
1765        assert_eq!(batches.len(), 1);
1766
1767        let batch = &batches[0];
1768        assert_eq!(batch.num_rows(), 1);
1769        assert_eq!(batch.num_columns(), 2);
1770
1771        let col = batch.column(0);
1772        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1773        assert_eq!(val, "b");
1774
1775        let col = batch.column(1);
1776        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1777        assert_eq!(val, 3);
1778
1779        // Should only have made 3 requests
1780        // * First request fetches data for evaluating the first predicate
1781        // * Second request fetches data for evaluating the second predicate
1782        // * Third request fetches data for evaluating the projection
1783        assert_eq!(requests.lock().unwrap().len(), 3);
1784    }
1785
1786    #[tokio::test]
1787    async fn test_limit_multiple_row_groups() {
1788        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1789        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1790        let c = Int32Array::from_iter(0..6);
1791        let data = RecordBatch::try_from_iter([
1792            ("a", Arc::new(a) as ArrayRef),
1793            ("b", Arc::new(b) as ArrayRef),
1794            ("c", Arc::new(c) as ArrayRef),
1795        ])
1796        .unwrap();
1797
1798        let mut buf = Vec::with_capacity(1024);
1799        let props = WriterProperties::builder()
1800            .set_max_row_group_size(3)
1801            .build();
1802        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1803        writer.write(&data).unwrap();
1804        writer.close().unwrap();
1805
1806        let data: Bytes = buf.into();
1807        let metadata = ParquetMetaDataReader::new()
1808            .parse_and_finish(&data)
1809            .unwrap();
1810
1811        assert_eq!(metadata.num_row_groups(), 2);
1812
1813        let test = TestReader::new(data);
1814
1815        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1816            .await
1817            .unwrap()
1818            .with_batch_size(1024)
1819            .with_limit(4)
1820            .build()
1821            .unwrap();
1822
1823        let batches: Vec<_> = stream.try_collect().await.unwrap();
1824        // Expect one batch for each row group
1825        assert_eq!(batches.len(), 2);
1826
1827        let batch = &batches[0];
1828        // First batch should contain all rows
1829        assert_eq!(batch.num_rows(), 3);
1830        assert_eq!(batch.num_columns(), 3);
1831        let col2 = batch.column(2).as_primitive::<Int32Type>();
1832        assert_eq!(col2.values(), &[0, 1, 2]);
1833
1834        let batch = &batches[1];
1835        // Second batch should trigger the limit and only have one row
1836        assert_eq!(batch.num_rows(), 1);
1837        assert_eq!(batch.num_columns(), 3);
1838        let col2 = batch.column(2).as_primitive::<Int32Type>();
1839        assert_eq!(col2.values(), &[3]);
1840
1841        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1842            .await
1843            .unwrap()
1844            .with_offset(2)
1845            .with_limit(3)
1846            .build()
1847            .unwrap();
1848
1849        let batches: Vec<_> = stream.try_collect().await.unwrap();
1850        // Expect one batch for each row group
1851        assert_eq!(batches.len(), 2);
1852
1853        let batch = &batches[0];
1854        // First batch should contain one row
1855        assert_eq!(batch.num_rows(), 1);
1856        assert_eq!(batch.num_columns(), 3);
1857        let col2 = batch.column(2).as_primitive::<Int32Type>();
1858        assert_eq!(col2.values(), &[2]);
1859
1860        let batch = &batches[1];
1861        // Second batch should contain two rows
1862        assert_eq!(batch.num_rows(), 2);
1863        assert_eq!(batch.num_columns(), 3);
1864        let col2 = batch.column(2).as_primitive::<Int32Type>();
1865        assert_eq!(col2.values(), &[3, 4]);
1866
1867        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1868            .await
1869            .unwrap()
1870            .with_offset(4)
1871            .with_limit(20)
1872            .build()
1873            .unwrap();
1874
1875        let batches: Vec<_> = stream.try_collect().await.unwrap();
1876        // Should skip first row group
1877        assert_eq!(batches.len(), 1);
1878
1879        let batch = &batches[0];
1880        // First batch should contain two rows
1881        assert_eq!(batch.num_rows(), 2);
1882        assert_eq!(batch.num_columns(), 3);
1883        let col2 = batch.column(2).as_primitive::<Int32Type>();
1884        assert_eq!(col2.values(), &[4, 5]);
1885    }
1886
1887    #[tokio::test]
1888    async fn test_row_filter_with_index() {
1889        let testdata = arrow::util::test_util::parquet_test_data();
1890        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1891        let data = Bytes::from(std::fs::read(path).unwrap());
1892
1893        let metadata = ParquetMetaDataReader::new()
1894            .parse_and_finish(&data)
1895            .unwrap();
1896        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1897
1898        assert_eq!(metadata.num_row_groups(), 1);
1899
1900        let async_reader = TestReader::new(data.clone());
1901
1902        let a_filter =
1903            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1904                Ok(batch.column(0).as_boolean().clone())
1905            });
1906
1907        let b_scalar = Int8Array::from(vec![2]);
1908        let b_filter = ArrowPredicateFn::new(
1909            ProjectionMask::leaves(&parquet_schema, vec![2]),
1910            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1911        );
1912
1913        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1914
1915        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1916
1917        let options = ArrowReaderOptions::new().with_page_index(true);
1918        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1919            .await
1920            .unwrap()
1921            .with_projection(mask.clone())
1922            .with_batch_size(1024)
1923            .with_row_filter(filter)
1924            .build()
1925            .unwrap();
1926
1927        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1928
1929        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1930
1931        assert_eq!(total_rows, 730);
1932    }
1933
1934    #[tokio::test]
1935    #[allow(deprecated)]
1936    async fn test_in_memory_row_group_sparse() {
1937        let testdata = arrow::util::test_util::parquet_test_data();
1938        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1939        let data = Bytes::from(std::fs::read(path).unwrap());
1940
1941        let metadata = ParquetMetaDataReader::new()
1942            .with_page_indexes(true)
1943            .parse_and_finish(&data)
1944            .unwrap();
1945
1946        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1947
1948        let mut metadata_builder = metadata.into_builder();
1949        let mut row_groups = metadata_builder.take_row_groups();
1950        row_groups.truncate(1);
1951        let row_group_meta = row_groups.pop().unwrap();
1952
1953        let metadata = metadata_builder
1954            .add_row_group(row_group_meta)
1955            .set_column_index(None)
1956            .set_offset_index(Some(vec![offset_index.clone()]))
1957            .build();
1958
1959        let metadata = Arc::new(metadata);
1960
1961        let num_rows = metadata.row_group(0).num_rows();
1962
1963        assert_eq!(metadata.num_row_groups(), 1);
1964
1965        let async_reader = TestReader::new(data.clone());
1966
1967        let requests = async_reader.requests.clone();
1968        let (_, fields) = parquet_to_arrow_schema_and_fields(
1969            metadata.file_metadata().schema_descr(),
1970            ProjectionMask::all(),
1971            None,
1972        )
1973        .unwrap();
1974
1975        let _schema_desc = metadata.file_metadata().schema_descr();
1976
1977        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1978
1979        let reader_factory = ReaderFactory {
1980            metadata,
1981            fields: fields.map(Arc::new),
1982            input: async_reader,
1983            filter: None,
1984            limit: None,
1985            offset: None,
1986            metrics: ArrowReaderMetrics::disabled(),
1987            max_predicate_cache_size: 0,
1988        };
1989
1990        let mut skip = true;
1991        let mut pages = offset_index[0].page_locations.iter().peekable();
1992
1993        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1994        let mut selectors = vec![];
1995        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1996        while let Some(page) = pages.next() {
1997            let num_rows = if let Some(next_page) = pages.peek() {
1998                next_page.first_row_index - page.first_row_index
1999            } else {
2000                num_rows - page.first_row_index
2001            };
2002
2003            if skip {
2004                selectors.push(RowSelector::skip(num_rows as usize));
2005            } else {
2006                selectors.push(RowSelector::select(num_rows as usize));
2007                let start = page.offset as usize;
2008                let end = start + page.compressed_page_size as usize;
2009                expected_page_requests.push(start..end);
2010            }
2011            skip = !skip;
2012        }
2013
2014        let selection = RowSelection::from(selectors);
2015
2016        let (_factory, _reader) = reader_factory
2017            .read_row_group(0, Some(selection), projection.clone(), 48)
2018            .await
2019            .expect("reading row group");
2020
2021        let requests = requests.lock().unwrap();
2022
2023        assert_eq!(&requests[..], &expected_page_requests)
2024    }
2025
2026    #[tokio::test]
2027    async fn test_batch_size_overallocate() {
2028        let testdata = arrow::util::test_util::parquet_test_data();
2029        // `alltypes_plain.parquet` only have 8 rows
2030        let path = format!("{testdata}/alltypes_plain.parquet");
2031        let data = Bytes::from(std::fs::read(path).unwrap());
2032
2033        let async_reader = TestReader::new(data.clone());
2034
2035        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2036            .await
2037            .unwrap();
2038
2039        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2040
2041        let stream = builder
2042            .with_projection(ProjectionMask::all())
2043            .with_batch_size(1024)
2044            .build()
2045            .unwrap();
2046        assert_ne!(1024, file_rows);
2047        assert_eq!(stream.batch_size, file_rows);
2048    }
2049
2050    #[tokio::test]
2051    async fn test_get_row_group_column_bloom_filter_without_length() {
2052        let testdata = arrow::util::test_util::parquet_test_data();
2053        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2054        let data = Bytes::from(std::fs::read(path).unwrap());
2055        test_get_row_group_column_bloom_filter(data, false).await;
2056    }
2057
2058    #[tokio::test]
2059    async fn test_parquet_record_batch_stream_schema() {
2060        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2061            schema.flattened_fields().iter().map(|f| f.name()).collect()
2062        }
2063
2064        // ParquetRecordBatchReaderBuilder::schema differs from
2065        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
2066        // schema contents (in terms of custom metadata attached to schema, and fields
2067        // returned). Test to ensure this remains consistent behaviour.
2068        //
2069        // Ensure same for asynchronous versions of the above.
2070
2071        // Prep data, for a schema with nested fields, with custom metadata
2072        let mut metadata = HashMap::with_capacity(1);
2073        metadata.insert("key".to_string(), "value".to_string());
2074
2075        let nested_struct_array = StructArray::from(vec![
2076            (
2077                Arc::new(Field::new("d", DataType::Utf8, true)),
2078                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2079            ),
2080            (
2081                Arc::new(Field::new("e", DataType::Utf8, true)),
2082                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2083            ),
2084        ]);
2085        let struct_array = StructArray::from(vec![
2086            (
2087                Arc::new(Field::new("a", DataType::Int32, true)),
2088                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2089            ),
2090            (
2091                Arc::new(Field::new("b", DataType::UInt64, true)),
2092                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2093            ),
2094            (
2095                Arc::new(Field::new(
2096                    "c",
2097                    nested_struct_array.data_type().clone(),
2098                    true,
2099                )),
2100                Arc::new(nested_struct_array) as ArrayRef,
2101            ),
2102        ]);
2103
2104        let schema =
2105            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2106        let record_batch = RecordBatch::from(struct_array)
2107            .with_schema(schema.clone())
2108            .unwrap();
2109
2110        // Write parquet with custom metadata in schema
2111        let mut file = tempfile().unwrap();
2112        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2113        writer.write(&record_batch).unwrap();
2114        writer.close().unwrap();
2115
2116        let all_fields = ["a", "b", "c", "d", "e"];
2117        // (leaf indices in mask, expected names in output schema all fields)
2118        let projections = [
2119            (vec![], vec![]),
2120            (vec![0], vec!["a"]),
2121            (vec![0, 1], vec!["a", "b"]),
2122            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2123            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2124        ];
2125
2126        // Ensure we're consistent for each of these projections
2127        for (indices, expected_projected_names) in projections {
2128            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2129                // Builder schema should preserve all fields and metadata
2130                assert_eq!(get_all_field_names(&builder), all_fields);
2131                assert_eq!(builder.metadata, metadata);
2132                // Reader & batch schema should show only projected fields, and no metadata
2133                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2134                assert_eq!(reader.metadata, HashMap::default());
2135                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2136                assert_eq!(batch.metadata, HashMap::default());
2137            };
2138
2139            let builder =
2140                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2141            let sync_builder_schema = builder.schema().clone();
2142            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2143            let mut reader = builder.with_projection(mask).build().unwrap();
2144            let sync_reader_schema = reader.schema();
2145            let batch = reader.next().unwrap().unwrap();
2146            let sync_batch_schema = batch.schema();
2147            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2148
2149            // asynchronous should be same
2150            let file = tokio::fs::File::from(file.try_clone().unwrap());
2151            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2152            let async_builder_schema = builder.schema().clone();
2153            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2154            let mut reader = builder.with_projection(mask).build().unwrap();
2155            let async_reader_schema = reader.schema().clone();
2156            let batch = reader.next().await.unwrap().unwrap();
2157            let async_batch_schema = batch.schema();
2158            assert_schemas(
2159                async_builder_schema,
2160                async_reader_schema,
2161                async_batch_schema,
2162            );
2163        }
2164    }
2165
2166    #[tokio::test]
2167    async fn test_get_row_group_column_bloom_filter_with_length() {
2168        // convert to new parquet file with bloom_filter_length
2169        let testdata = arrow::util::test_util::parquet_test_data();
2170        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2171        let data = Bytes::from(std::fs::read(path).unwrap());
2172        let async_reader = TestReader::new(data.clone());
2173        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2174            .await
2175            .unwrap();
2176        let schema = builder.schema().clone();
2177        let stream = builder.build().unwrap();
2178        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2179
2180        let mut parquet_data = Vec::new();
2181        let props = WriterProperties::builder()
2182            .set_bloom_filter_enabled(true)
2183            .build();
2184        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2185        for batch in batches {
2186            writer.write(&batch).unwrap();
2187        }
2188        writer.close().unwrap();
2189
2190        // test the new parquet file
2191        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2192    }
2193
2194    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2195        let async_reader = TestReader::new(data.clone());
2196
2197        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2198            .await
2199            .unwrap();
2200
2201        let metadata = builder.metadata();
2202        assert_eq!(metadata.num_row_groups(), 1);
2203        let row_group = metadata.row_group(0);
2204        let column = row_group.column(0);
2205        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2206
2207        let sbbf = builder
2208            .get_row_group_column_bloom_filter(0, 0)
2209            .await
2210            .unwrap()
2211            .unwrap();
2212        assert!(sbbf.check(&"Hello"));
2213        assert!(!sbbf.check(&"Hello_Not_Exists"));
2214    }
2215
2216    #[tokio::test]
2217    async fn test_nested_skip() {
2218        let schema = Arc::new(Schema::new(vec![
2219            Field::new("col_1", DataType::UInt64, false),
2220            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2221        ]));
2222
2223        // Default writer properties
2224        let props = WriterProperties::builder()
2225            .set_data_page_row_count_limit(256)
2226            .set_write_batch_size(256)
2227            .set_max_row_group_size(1024);
2228
2229        // Write data
2230        let mut file = tempfile().unwrap();
2231        let mut writer =
2232            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2233
2234        let mut builder = ListBuilder::new(StringBuilder::new());
2235        for id in 0..1024 {
2236            match id % 3 {
2237                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2238                1 => builder.append_value([Some(format!("id_{id}"))]),
2239                _ => builder.append_null(),
2240            }
2241        }
2242        let refs = vec![
2243            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2244            Arc::new(builder.finish()) as ArrayRef,
2245        ];
2246
2247        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2248        writer.write(&batch).unwrap();
2249        writer.close().unwrap();
2250
2251        let selections = [
2252            RowSelection::from(vec![
2253                RowSelector::skip(313),
2254                RowSelector::select(1),
2255                RowSelector::skip(709),
2256                RowSelector::select(1),
2257            ]),
2258            RowSelection::from(vec![
2259                RowSelector::skip(255),
2260                RowSelector::select(1),
2261                RowSelector::skip(767),
2262                RowSelector::select(1),
2263            ]),
2264            RowSelection::from(vec![
2265                RowSelector::select(255),
2266                RowSelector::skip(1),
2267                RowSelector::select(767),
2268                RowSelector::skip(1),
2269            ]),
2270            RowSelection::from(vec![
2271                RowSelector::skip(254),
2272                RowSelector::select(1),
2273                RowSelector::select(1),
2274                RowSelector::skip(767),
2275                RowSelector::select(1),
2276            ]),
2277        ];
2278
2279        for selection in selections {
2280            let expected = selection.row_count();
2281            // Read data
2282            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2283                tokio::fs::File::from_std(file.try_clone().unwrap()),
2284                ArrowReaderOptions::new().with_page_index(true),
2285            )
2286            .await
2287            .unwrap();
2288
2289            reader = reader.with_row_selection(selection);
2290
2291            let mut stream = reader.build().unwrap();
2292
2293            let mut total_rows = 0;
2294            while let Some(rb) = stream.next().await {
2295                let rb = rb.unwrap();
2296                total_rows += rb.num_rows();
2297            }
2298            assert_eq!(total_rows, expected);
2299        }
2300    }
2301
2302    #[tokio::test]
2303    async fn test_row_filter_nested() {
2304        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2305        let b = StructArray::from(vec![
2306            (
2307                Arc::new(Field::new("aa", DataType::Utf8, true)),
2308                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2309            ),
2310            (
2311                Arc::new(Field::new("bb", DataType::Utf8, true)),
2312                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2313            ),
2314        ]);
2315        let c = Int32Array::from_iter(0..6);
2316        let data = RecordBatch::try_from_iter([
2317            ("a", Arc::new(a) as ArrayRef),
2318            ("b", Arc::new(b) as ArrayRef),
2319            ("c", Arc::new(c) as ArrayRef),
2320        ])
2321        .unwrap();
2322
2323        let mut buf = Vec::with_capacity(1024);
2324        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2325        writer.write(&data).unwrap();
2326        writer.close().unwrap();
2327
2328        let data: Bytes = buf.into();
2329        let metadata = ParquetMetaDataReader::new()
2330            .parse_and_finish(&data)
2331            .unwrap();
2332        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2333
2334        let test = TestReader::new(data);
2335        let requests = test.requests.clone();
2336
2337        let a_scalar = StringArray::from_iter_values(["b"]);
2338        let a_filter = ArrowPredicateFn::new(
2339            ProjectionMask::leaves(&parquet_schema, vec![0]),
2340            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2341        );
2342
2343        let b_scalar = StringArray::from_iter_values(["4"]);
2344        let b_filter = ArrowPredicateFn::new(
2345            ProjectionMask::leaves(&parquet_schema, vec![2]),
2346            move |batch| {
2347                // Filter on the second element of the struct.
2348                let struct_array = batch
2349                    .column(0)
2350                    .as_any()
2351                    .downcast_ref::<StructArray>()
2352                    .unwrap();
2353                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2354            },
2355        );
2356
2357        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2358
2359        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2360        let stream = ParquetRecordBatchStreamBuilder::new(test)
2361            .await
2362            .unwrap()
2363            .with_projection(mask.clone())
2364            .with_batch_size(1024)
2365            .with_row_filter(filter)
2366            .build()
2367            .unwrap();
2368
2369        let batches: Vec<_> = stream.try_collect().await.unwrap();
2370        assert_eq!(batches.len(), 1);
2371
2372        let batch = &batches[0];
2373        assert_eq!(batch.num_rows(), 1);
2374        assert_eq!(batch.num_columns(), 2);
2375
2376        let col = batch.column(0);
2377        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2378        assert_eq!(val, "b");
2379
2380        let col = batch.column(1);
2381        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2382        assert_eq!(val, 3);
2383
2384        // Should only have made 3 requests
2385        // * First request fetches data for evaluating the first predicate
2386        // * Second request fetches data for evaluating the second predicate
2387        // * Third request fetches data for evaluating the projection
2388        assert_eq!(requests.lock().unwrap().len(), 3);
2389    }
2390
2391    #[tokio::test]
2392    async fn test_cache_projection_excludes_nested_columns() {
2393        use arrow_array::{ArrayRef, StringArray};
2394
2395        // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
2396        let a = StringArray::from_iter_values(["r1", "r2"]);
2397        let b = StructArray::from(vec![
2398            (
2399                Arc::new(Field::new("aa", DataType::Utf8, true)),
2400                Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2401            ),
2402            (
2403                Arc::new(Field::new("bb", DataType::Utf8, true)),
2404                Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2405            ),
2406        ]);
2407
2408        let schema = Arc::new(Schema::new(vec![
2409            Field::new("a", DataType::Utf8, true),
2410            Field::new("b", b.data_type().clone(), true),
2411        ]));
2412
2413        let mut buf = Vec::new();
2414        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2415        let batch = RecordBatch::try_from_iter([
2416            ("a", Arc::new(a) as ArrayRef),
2417            ("b", Arc::new(b) as ArrayRef),
2418        ])
2419        .unwrap();
2420        writer.write(&batch).unwrap();
2421        writer.close().unwrap();
2422
2423        // Load Parquet metadata
2424        let data: Bytes = buf.into();
2425        let metadata = ParquetMetaDataReader::new()
2426            .parse_and_finish(&data)
2427            .unwrap();
2428        let metadata = Arc::new(metadata);
2429
2430        // Build a RowFilter whose predicate projects a leaf under the nested root `b`
2431        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
2432        let parquet_schema = metadata.file_metadata().schema_descr();
2433        let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2434
2435        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2436            Ok(arrow_array::BooleanArray::from(vec![
2437                true;
2438                batch.num_rows()
2439            ]))
2440        });
2441        let filter = RowFilter::new(vec![Box::new(always_true)]);
2442
2443        // Construct a ReaderFactory and compute cache projection
2444        let reader_factory = ReaderFactory {
2445            metadata: Arc::clone(&metadata),
2446            fields: None,
2447            input: TestReader::new(data),
2448            filter: Some(filter),
2449            limit: None,
2450            offset: None,
2451            metrics: ArrowReaderMetrics::disabled(),
2452            max_predicate_cache_size: 0,
2453        };
2454
2455        // Provide an output projection that also selects the same nested leaf
2456        let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2457
2458        // Expect None since nested columns should be excluded from cache projection
2459        assert!(cache_projection.is_none());
2460    }
2461
2462    #[tokio::test]
2463    #[allow(deprecated)]
2464    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2465        use tokio::fs::File;
2466        let testdata = arrow::util::test_util::parquet_test_data();
2467        let path = format!("{testdata}/alltypes_plain.parquet");
2468        let mut file = File::open(&path).await.unwrap();
2469        let file_size = file.metadata().await.unwrap().len();
2470        let mut metadata = ParquetMetaDataReader::new()
2471            .with_page_indexes(true)
2472            .load_and_finish(&mut file, file_size)
2473            .await
2474            .unwrap();
2475
2476        metadata.set_offset_index(Some(vec![]));
2477        let options = ArrowReaderOptions::new().with_page_index(true);
2478        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2479        let reader =
2480            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2481                .build()
2482                .unwrap();
2483
2484        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2485        assert_eq!(result.len(), 1);
2486    }
2487
2488    #[tokio::test]
2489    #[allow(deprecated)]
2490    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2491        use tokio::fs::File;
2492        let testdata = arrow::util::test_util::parquet_test_data();
2493        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2494        let mut file = File::open(&path).await.unwrap();
2495        let file_size = file.metadata().await.unwrap().len();
2496        let metadata = ParquetMetaDataReader::new()
2497            .with_page_indexes(true)
2498            .load_and_finish(&mut file, file_size)
2499            .await
2500            .unwrap();
2501
2502        let options = ArrowReaderOptions::new().with_page_index(true);
2503        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2504        let reader =
2505            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2506                .build()
2507                .unwrap();
2508
2509        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2510        assert_eq!(result.len(), 8);
2511    }
2512
2513    #[tokio::test]
2514    #[allow(deprecated)]
2515    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2516        use tempfile::TempDir;
2517        use tokio::fs::File;
2518        fn write_metadata_to_local_file(
2519            metadata: ParquetMetaData,
2520            file: impl AsRef<std::path::Path>,
2521        ) {
2522            use crate::file::metadata::ParquetMetaDataWriter;
2523            use std::fs::File;
2524            let file = File::create(file).unwrap();
2525            ParquetMetaDataWriter::new(file, &metadata)
2526                .finish()
2527                .unwrap()
2528        }
2529
2530        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2531            use std::fs::File;
2532            let file = File::open(file).unwrap();
2533            ParquetMetaDataReader::new()
2534                .with_page_indexes(true)
2535                .parse_and_finish(&file)
2536                .unwrap()
2537        }
2538
2539        let testdata = arrow::util::test_util::parquet_test_data();
2540        let path = format!("{testdata}/alltypes_plain.parquet");
2541        let mut file = File::open(&path).await.unwrap();
2542        let file_size = file.metadata().await.unwrap().len();
2543        let metadata = ParquetMetaDataReader::new()
2544            .with_page_indexes(true)
2545            .load_and_finish(&mut file, file_size)
2546            .await
2547            .unwrap();
2548
2549        let tempdir = TempDir::new().unwrap();
2550        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2551        write_metadata_to_local_file(metadata, &metadata_path);
2552        let metadata = read_metadata_from_local_file(&metadata_path);
2553
2554        let options = ArrowReaderOptions::new().with_page_index(true);
2555        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2556        let reader =
2557            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2558                .build()
2559                .unwrap();
2560
2561        // Panics here
2562        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2563        assert_eq!(result.len(), 1);
2564    }
2565
2566    #[tokio::test]
2567    async fn test_cached_array_reader_sparse_offset_error() {
2568        use futures::TryStreamExt;
2569
2570        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2571        use arrow_array::{BooleanArray, RecordBatch};
2572
2573        let testdata = arrow::util::test_util::parquet_test_data();
2574        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2575        let data = Bytes::from(std::fs::read(path).unwrap());
2576
2577        let async_reader = TestReader::new(data);
2578
2579        // Enable page index so the fetch logic loads only required pages
2580        let options = ArrowReaderOptions::new().with_page_index(true);
2581        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2582            .await
2583            .unwrap();
2584
2585        // Skip the first 22 rows (entire first Parquet page) and then select the
2586        // next 3 rows (22, 23, 24). This means the fetch step will not include
2587        // the first page starting at file offset 0.
2588        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2589
2590        // Trivial predicate on column 0 that always returns `true`. Using the
2591        // same column in both predicate and projection activates the caching
2592        // layer (Producer/Consumer pattern).
2593        let parquet_schema = builder.parquet_schema();
2594        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2595        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2596            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2597        });
2598        let filter = RowFilter::new(vec![Box::new(always_true)]);
2599
2600        // Build the stream with batch size 8 so the cache reads whole batches
2601        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2602        let stream = builder
2603            .with_batch_size(8)
2604            .with_projection(proj)
2605            .with_row_selection(selection)
2606            .with_row_filter(filter)
2607            .build()
2608            .unwrap();
2609
2610        // Collecting the stream should fail with the sparse column chunk offset
2611        // error we want to reproduce.
2612        let _result: Vec<_> = stream.try_collect().await.unwrap();
2613    }
2614}