parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42    ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46    RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
73///
74/// Notes:
75///
76/// 1. There is a default implementation for types that implement [`AsyncRead`]
77///    and [`AsyncSeek`], for example [`tokio::fs::File`].
78///
79/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
80///    is enabled, implements this interface for [`ObjectStore`].
81///
82/// [`ObjectStore`]: object_store::ObjectStore
83///
84/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
85pub trait AsyncFileReader: Send {
86    /// Retrieve the bytes in `range`
87    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
90    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91        async move {
92            let mut result = Vec::with_capacity(ranges.len());
93
94            for range in ranges.into_iter() {
95                let data = self.get_bytes(range).await?;
96                result.push(data);
97            }
98
99            Ok(result)
100        }
101        .boxed()
102    }
103
104    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
105    ///
106    /// This is an asynchronous operation as it may involve reading the file
107    /// footer and potentially other metadata from disk or a remote source.
108    ///
109    /// Reading data from Parquet requires the metadata to understand the
110    /// schema, row groups, and location of pages within the file. This metadata
111    /// is stored primarily in the footer of the Parquet file, and can be read using
112    /// [`ParquetMetaDataReader`].
113    ///
114    /// However, implementations can significantly speed up reading Parquet by
115    /// supplying cached metadata or pre-fetched metadata via this API.
116    ///
117    /// # Parameters
118    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
119    ///   and other options that affect how the metadata is read.
120    fn get_metadata<'a>(
121        &'a mut self,
122        options: Option<&'a ArrowReaderOptions>,
123    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
127impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129        self.as_mut().get_bytes(range)
130    }
131
132    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133        self.as_mut().get_byte_ranges(ranges)
134    }
135
136    fn get_metadata<'a>(
137        &'a mut self,
138        options: Option<&'a ArrowReaderOptions>,
139    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140        self.as_mut().get_metadata(options)
141    }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146        async move {
147            self.seek(SeekFrom::End(-(suffix as i64))).await?;
148            let mut buf = Vec::with_capacity(suffix);
149            self.take(suffix as _).read_to_end(&mut buf).await?;
150            Ok(buf.into())
151        }
152        .boxed()
153    }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158        async move {
159            self.seek(SeekFrom::Start(range.start)).await?;
160
161            let to_read = range.end - range.start;
162            let mut buffer = Vec::with_capacity(to_read.try_into()?);
163            let read = self.take(to_read).read_to_end(&mut buffer).await?;
164            if read as u64 != to_read {
165                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166            }
167
168            Ok(buffer.into())
169        }
170        .boxed()
171    }
172
173    fn get_metadata<'a>(
174        &'a mut self,
175        options: Option<&'a ArrowReaderOptions>,
176    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177        async move {
178            let metadata_reader = ParquetMetaDataReader::new()
179                .with_page_indexes(options.is_some_and(|o| o.page_index));
180
181            #[cfg(feature = "encryption")]
182            let metadata_reader = metadata_reader.with_decryption_properties(
183                options.and_then(|o| o.file_decryption_properties.as_ref()),
184            );
185
186            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
187            Ok(Arc::new(parquet_metadata))
188        }
189        .boxed()
190    }
191}
192
193impl ArrowReaderMetadata {
194    /// Returns a new [`ArrowReaderMetadata`] for this builder
195    ///
196    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
197    pub async fn load_async<T: AsyncFileReader>(
198        input: &mut T,
199        options: ArrowReaderOptions,
200    ) -> Result<Self> {
201        let metadata = input.get_metadata(Some(&options)).await?;
202        Self::try_new(metadata, options)
203    }
204}
205
206#[doc(hidden)]
207/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
208///
209/// Allows sharing the same builder for both the sync and async versions, whilst also not
210/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
211pub struct AsyncReader<T>(T);
212
213/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
214///
215/// This can be used to decode a Parquet file in streaming fashion (without
216/// downloading the whole file at once) from a remote source, such as an object store.
217///
218/// This builder handles reading the parquet file metadata, allowing consumers
219/// to use this information to select what specific columns, row groups, etc.
220/// they wish to be read by the resulting stream.
221///
222/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
223///
224/// See [`ArrowReaderBuilder`] for additional member functions
225pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
226
227impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
228    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
229    /// specified source.
230    ///
231    /// # Example
232    /// ```
233    /// # #[tokio::main(flavor="current_thread")]
234    /// # async fn main() {
235    /// #
236    /// # use arrow_array::RecordBatch;
237    /// # use arrow::util::pretty::pretty_format_batches;
238    /// # use futures::TryStreamExt;
239    /// #
240    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
241    /// #
242    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
243    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
244    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
245    /// #     assert_eq!(
246    /// #          &actual_lines, expected_lines,
247    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
248    /// #          expected_lines, actual_lines
249    /// #      );
250    /// #  }
251    /// #
252    /// # let testdata = arrow::util::test_util::parquet_test_data();
253    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
254    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
255    /// // another async I/O reader such as a reader from an object store.
256    /// let file = tokio::fs::File::open(path).await.unwrap();
257    ///
258    /// // Configure options for reading from the async source
259    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
260    ///     .await
261    ///     .unwrap();
262    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
263    /// // a stream that can be used to incrementally read the data in batches
264    /// let stream = builder.build().unwrap();
265    /// // In this example, we collect the stream into a Vec<RecordBatch>
266    /// // but real applications would likely process the batches as they are read
267    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
268    /// // Demonstrate the results are as expected
269    /// assert_batches_eq(
270    ///     &results,
271    ///     &[
272    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
273    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
274    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
275    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
276    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
277    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
278    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
279    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
280    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
281    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
282    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
283    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
284    ///      ],
285    ///  );
286    /// # }
287    /// ```
288    ///
289    /// # Example configuring options and reading metadata
290    ///
291    /// There are many options that control the behavior of the reader, such as
292    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
293    ///
294    /// ```
295    /// # #[tokio::main(flavor="current_thread")]
296    /// # async fn main() {
297    /// #
298    /// # use arrow_array::RecordBatch;
299    /// # use arrow::util::pretty::pretty_format_batches;
300    /// # use futures::TryStreamExt;
301    /// #
302    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
303    /// #
304    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
305    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
306    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
307    /// #     assert_eq!(
308    /// #          &actual_lines, expected_lines,
309    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
310    /// #          expected_lines, actual_lines
311    /// #      );
312    /// #  }
313    /// #
314    /// # let testdata = arrow::util::test_util::parquet_test_data();
315    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
316    /// // As before, use tokio::fs::File to read data using an async I/O.
317    /// let file = tokio::fs::File::open(path).await.unwrap();
318    ///
319    /// // Configure options for reading from the async source, in this case we set the batch size
320    /// // to 3 which produces 3 rows at a time.
321    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
322    ///     .await
323    ///     .unwrap()
324    ///     .with_batch_size(3);
325    ///
326    /// // We can also read the metadata to inspect the schema and other metadata
327    /// // before actually reading the data
328    /// let file_metadata = builder.metadata().file_metadata();
329    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
330    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
331    ///
332    /// let stream = builder.with_projection(mask).build().unwrap();
333    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
334    /// // Print out the results
335    /// assert_batches_eq(
336    ///     &results,
337    ///     &[
338    ///         "+----------+-------------+-----------+",
339    ///         "| bool_col | tinyint_col | float_col |",
340    ///         "+----------+-------------+-----------+",
341    ///         "| true     | 0           | 0.0       |",
342    ///         "| false    | 1           | 1.1       |",
343    ///         "| true     | 0           | 0.0       |",
344    ///         "| false    | 1           | 1.1       |",
345    ///         "| true     | 0           | 0.0       |",
346    ///         "| false    | 1           | 1.1       |",
347    ///         "| true     | 0           | 0.0       |",
348    ///         "| false    | 1           | 1.1       |",
349    ///         "+----------+-------------+-----------+",
350    ///      ],
351    ///  );
352    ///
353    /// // The results has 8 rows, so since we set the batch size to 3, we expect
354    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
355    /// assert_eq!(results.len(), 3);
356    /// # }
357    /// ```
358    pub async fn new(input: T) -> Result<Self> {
359        Self::new_with_options(input, Default::default()).await
360    }
361
362    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
363    /// and [`ArrowReaderOptions`].
364    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
365        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
366        Ok(Self::new_with_metadata(input, metadata))
367    }
368
369    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
370    ///
371    /// This allows loading metadata once and using it to create multiple builders with
372    /// potentially different settings, that can be read in parallel.
373    ///
374    /// # Example of reading from multiple streams in parallel
375    ///
376    /// ```
377    /// # use std::fs::metadata;
378    /// # use std::sync::Arc;
379    /// # use bytes::Bytes;
380    /// # use arrow_array::{Int32Array, RecordBatch};
381    /// # use arrow_schema::{DataType, Field, Schema};
382    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
383    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
384    /// # use tempfile::tempfile;
385    /// # use futures::StreamExt;
386    /// # #[tokio::main(flavor="current_thread")]
387    /// # async fn main() {
388    /// #
389    /// # let mut file = tempfile().unwrap();
390    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
391    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
392    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
393    /// # writer.write(&batch).unwrap();
394    /// # writer.close().unwrap();
395    /// // open file with parquet data
396    /// let mut file = tokio::fs::File::from_std(file);
397    /// // load metadata once
398    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
399    /// // create two readers, a and b, from the same underlying file
400    /// // without reading the metadata again
401    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
402    ///     file.try_clone().await.unwrap(),
403    ///     meta.clone()
404    /// ).build().unwrap();
405    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
406    ///
407    /// // Can read batches from both readers in parallel
408    /// assert_eq!(
409    ///   a.next().await.unwrap().unwrap(),
410    ///   b.next().await.unwrap().unwrap(),
411    /// );
412    /// # }
413    /// ```
414    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
415        Self::new_builder(AsyncReader(input), metadata)
416    }
417
418    /// Read bloom filter for a column in a row group
419    ///
420    /// Returns `None` if the column does not have a bloom filter
421    ///
422    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
423    pub async fn get_row_group_column_bloom_filter(
424        &mut self,
425        row_group_idx: usize,
426        column_idx: usize,
427    ) -> Result<Option<Sbbf>> {
428        let metadata = self.metadata.row_group(row_group_idx);
429        let column_metadata = metadata.column(column_idx);
430
431        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
432            offset
433                .try_into()
434                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
435        } else {
436            return Ok(None);
437        };
438
439        let buffer = match column_metadata.bloom_filter_length() {
440            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
441            None => self
442                .input
443                .0
444                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
445        }
446        .await?;
447
448        let (header, bitset_offset) =
449            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
450
451        match header.algorithm {
452            BloomFilterAlgorithm::BLOCK(_) => {
453                // this match exists to future proof the singleton algorithm enum
454            }
455        }
456        match header.compression {
457            BloomFilterCompression::UNCOMPRESSED(_) => {
458                // this match exists to future proof the singleton compression enum
459            }
460        }
461        match header.hash {
462            BloomFilterHash::XXHASH(_) => {
463                // this match exists to future proof the singleton hash enum
464            }
465        }
466
467        let bitset = match column_metadata.bloom_filter_length() {
468            Some(_) => buffer.slice(
469                (TryInto::<usize>::try_into(bitset_offset).unwrap()
470                    - TryInto::<usize>::try_into(offset).unwrap())..,
471            ),
472            None => {
473                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
474                    ParquetError::General("Bloom filter length is invalid".to_string())
475                })?;
476                self.input
477                    .0
478                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
479                    .await?
480            }
481        };
482        Ok(Some(Sbbf::new(&bitset)))
483    }
484
485    /// Build a new [`ParquetRecordBatchStream`]
486    ///
487    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
488    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
489        let num_row_groups = self.metadata.row_groups().len();
490
491        let row_groups = match self.row_groups {
492            Some(row_groups) => {
493                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
494                    return Err(general_err!(
495                        "row group {} out of bounds 0..{}",
496                        col,
497                        num_row_groups
498                    ));
499                }
500                row_groups.into()
501            }
502            None => (0..self.metadata.row_groups().len()).collect(),
503        };
504
505        // Try to avoid allocate large buffer
506        let batch_size = self
507            .batch_size
508            .min(self.metadata.file_metadata().num_rows() as usize);
509        let reader_factory = ReaderFactory {
510            input: self.input.0,
511            filter: self.filter,
512            metadata: self.metadata.clone(),
513            fields: self.fields,
514            limit: self.limit,
515            offset: self.offset,
516            metrics: self.metrics,
517            max_predicate_cache_size: self.max_predicate_cache_size,
518        };
519
520        // Ensure schema of ParquetRecordBatchStream respects projection, and does
521        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
522        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
523            Some(DataType::Struct(fields)) => {
524                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
525            }
526            None => Fields::empty(),
527            _ => unreachable!("Must be Struct for root type"),
528        };
529        let schema = Arc::new(Schema::new(projected_fields));
530
531        Ok(ParquetRecordBatchStream {
532            metadata: self.metadata,
533            batch_size,
534            row_groups,
535            projection: self.projection,
536            selection: self.selection,
537            schema,
538            reader_factory: Some(reader_factory),
539            state: StreamState::Init,
540        })
541    }
542}
543
544/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
545///
546/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
547/// offset), returns `None` for the reader.
548type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
549
550/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
551/// [`ParquetRecordBatchReader`]
552struct ReaderFactory<T> {
553    metadata: Arc<ParquetMetaData>,
554
555    /// Top level parquet schema
556    fields: Option<Arc<ParquetField>>,
557
558    input: T,
559
560    /// Optional filter
561    filter: Option<RowFilter>,
562
563    /// Limit to apply to remaining row groups.  
564    limit: Option<usize>,
565
566    /// Offset to apply to the next
567    offset: Option<usize>,
568
569    /// Metrics
570    metrics: ArrowReaderMetrics,
571
572    /// Maximum size of the predicate cache
573    max_predicate_cache_size: usize,
574}
575
576impl<T> ReaderFactory<T>
577where
578    T: AsyncFileReader + Send,
579{
580    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
581    ///
582    /// Updates the `limit` and `offset` of the reader factory
583    ///
584    /// Note: this captures self so that the resulting future has a static lifetime
585    async fn read_row_group(
586        mut self,
587        row_group_idx: usize,
588        selection: Option<RowSelection>,
589        projection: ProjectionMask,
590        batch_size: usize,
591    ) -> ReadResult<T> {
592        // TODO: calling build_array multiple times is wasteful
593
594        let meta = self.metadata.row_group(row_group_idx);
595        let offset_index = self
596            .metadata
597            .offset_index()
598            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
599            .filter(|index| !index.is_empty())
600            .map(|x| x[row_group_idx].as_slice());
601
602        // Reuse columns that are selected and used by the filters
603        let cache_projection = match self.compute_cache_projection(&projection) {
604            Some(projection) => projection,
605            None => ProjectionMask::none(meta.columns().len()),
606        };
607        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
608            batch_size,
609            self.max_predicate_cache_size,
610        )));
611
612        let mut row_group = InMemoryRowGroup {
613            // schema: meta.schema_descr_ptr(),
614            row_count: meta.num_rows() as usize,
615            column_chunks: vec![None; meta.columns().len()],
616            offset_index,
617            row_group_idx,
618            metadata: self.metadata.as_ref(),
619        };
620
621        let cache_options_builder =
622            CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone());
623
624        let filter = self.filter.as_mut();
625        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627        // Update selection based on any filters
628        if let Some(filter) = filter {
629            let cache_options = cache_options_builder.clone().producer();
630
631            for predicate in filter.predicates.iter_mut() {
632                if !plan_builder.selects_any() {
633                    return Ok((self, None)); // ruled out entire row group
634                }
635
636                // (pre) Fetch only the columns that are selected by the predicate
637                let selection = plan_builder.selection();
638                // Fetch predicate columns; expand selection only for cached predicate columns
639                let cache_mask = Some(&cache_projection);
640                row_group
641                    .fetch(
642                        &mut self.input,
643                        predicate.projection(),
644                        selection,
645                        batch_size,
646                        cache_mask,
647                    )
648                    .await?;
649
650                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651                    .with_cache_options(Some(&cache_options))
652                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655            }
656        }
657
658        // Compute the number of rows in the selection before applying limit and offset
659        let rows_before = plan_builder
660            .num_rows_selected()
661            .unwrap_or(row_group.row_count);
662
663        if rows_before == 0 {
664            return Ok((self, None)); // ruled out entire row group
665        }
666
667        // Apply any limit and offset
668        let plan_builder = plan_builder
669            .limited(row_group.row_count)
670            .with_offset(self.offset)
671            .with_limit(self.limit)
672            .build_limited();
673
674        let rows_after = plan_builder
675            .num_rows_selected()
676            .unwrap_or(row_group.row_count);
677
678        // Update running offset and limit for after the current row group is read
679        if let Some(offset) = &mut self.offset {
680            // Reduction is either because of offset or limit, as limit is applied
681            // after offset has been "exhausted" can just use saturating sub here
682            *offset = offset.saturating_sub(rows_before - rows_after)
683        }
684
685        if rows_after == 0 {
686            return Ok((self, None)); // ruled out entire row group
687        }
688
689        if let Some(limit) = &mut self.limit {
690            *limit -= rows_after;
691        }
692        // fetch the pages needed for decoding
693        row_group
694            // Final projection fetch shouldn't expand selection for cache; pass None
695            .fetch(
696                &mut self.input,
697                &projection,
698                plan_builder.selection(),
699                batch_size,
700                None,
701            )
702            .await?;
703
704        let plan = plan_builder.build();
705
706        let cache_options = cache_options_builder.consumer();
707        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708            .with_cache_options(Some(&cache_options))
709            .build_array_reader(self.fields.as_deref(), &projection)?;
710
711        let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713        Ok((self, Some(reader)))
714    }
715
716    /// Compute which columns are used in filters and the final (output) projection
717    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718        let filters = self.filter.as_ref()?;
719        let mut cache_projection = filters.predicates.first()?.projection().clone();
720        for predicate in filters.predicates.iter() {
721            cache_projection.union(predicate.projection());
722        }
723        cache_projection.intersect(projection);
724        self.exclude_nested_columns_from_cache(&cache_projection)
725    }
726
727    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
728    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
729        let schema = self.metadata.file_metadata().schema_descr();
730        let num_leaves = schema.num_columns();
731
732        // Count how many leaves each root column has
733        let num_roots = schema.root_schema().get_fields().len();
734        let mut root_leaf_counts = vec![0usize; num_roots];
735        for leaf_idx in 0..num_leaves {
736            let root_idx = schema.get_column_root_idx(leaf_idx);
737            root_leaf_counts[root_idx] += 1;
738        }
739
740        // Keep only leaves whose root has exactly one leaf (non-nested)
741        let mut included_leaves = Vec::new();
742        for leaf_idx in 0..num_leaves {
743            if mask.leaf_included(leaf_idx) {
744                let root_idx = schema.get_column_root_idx(leaf_idx);
745                if root_leaf_counts[root_idx] == 1 {
746                    included_leaves.push(leaf_idx);
747                }
748            }
749        }
750
751        if included_leaves.is_empty() {
752            None
753        } else {
754            Some(ProjectionMask::leaves(schema, included_leaves))
755        }
756    }
757}
758
759enum StreamState<T> {
760    /// At the start of a new row group, or the end of the parquet stream
761    Init,
762    /// Decoding a batch
763    Decoding(ParquetRecordBatchReader),
764    /// Reading data from input
765    Reading(BoxFuture<'static, ReadResult<T>>),
766    /// Error
767    Error,
768}
769
770impl<T> std::fmt::Debug for StreamState<T> {
771    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772        match self {
773            StreamState::Init => write!(f, "StreamState::Init"),
774            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
775            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
776            StreamState::Error => write!(f, "StreamState::Error"),
777        }
778    }
779}
780
781/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
782///
783/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
784/// allowing users to decode record batches separately from I/O.
785///
786/// # I/O Buffering
787///
788/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
789/// (projection + filtering, etc) and decodes the rows from those buffered pages.
790///
791/// For example, if all rows and columns are selected, the entire row group is
792/// buffered in memory during decode. This minimizes the number of IO operations
793/// required, which is especially important for object stores, where IO operations
794/// have latencies in the hundreds of milliseconds
795///
796///
797/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
798pub struct ParquetRecordBatchStream<T> {
799    metadata: Arc<ParquetMetaData>,
800
801    schema: SchemaRef,
802
803    row_groups: VecDeque<usize>,
804
805    projection: ProjectionMask,
806
807    batch_size: usize,
808
809    selection: Option<RowSelection>,
810
811    /// This is an option so it can be moved into a future
812    reader_factory: Option<ReaderFactory<T>>,
813
814    state: StreamState<T>,
815}
816
817impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
818    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819        f.debug_struct("ParquetRecordBatchStream")
820            .field("metadata", &self.metadata)
821            .field("schema", &self.schema)
822            .field("batch_size", &self.batch_size)
823            .field("projection", &self.projection)
824            .field("state", &self.state)
825            .finish()
826    }
827}
828
829impl<T> ParquetRecordBatchStream<T> {
830    /// Returns the projected [`SchemaRef`] for reading the parquet file.
831    ///
832    /// Note that the schema metadata will be stripped here. See
833    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
834    pub fn schema(&self) -> &SchemaRef {
835        &self.schema
836    }
837}
838
839impl<T> ParquetRecordBatchStream<T>
840where
841    T: AsyncFileReader + Unpin + Send + 'static,
842{
843    /// Fetches the next row group from the stream.
844    ///
845    /// Users can continue to call this function to get row groups and decode them concurrently.
846    ///
847    /// ## Notes
848    ///
849    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
850    ///
851    /// ## Returns
852    ///
853    /// - `Ok(None)` if the stream has ended.
854    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
855    /// - `Ok(Some(reader))` which holds all the data for the row group.
856    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
857        loop {
858            match &mut self.state {
859                StreamState::Decoding(_) | StreamState::Reading(_) => {
860                    return Err(ParquetError::General(
861                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
862                    ))
863                }
864                StreamState::Init => {
865                    let row_group_idx = match self.row_groups.pop_front() {
866                        Some(idx) => idx,
867                        None => return Ok(None),
868                    };
869
870                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
871
872                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
873
874                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
875
876                    let (reader_factory, maybe_reader) = reader_factory
877                        .read_row_group(
878                            row_group_idx,
879                            selection,
880                            self.projection.clone(),
881                            self.batch_size,
882                        )
883                        .await
884                        .inspect_err(|_| {
885                            self.state = StreamState::Error;
886                        })?;
887                    self.reader_factory = Some(reader_factory);
888
889                    if let Some(reader) = maybe_reader {
890                        return Ok(Some(reader));
891                    } else {
892                        // All rows skipped, read next row group
893                        continue;
894                    }
895                }
896                StreamState::Error => return Ok(None), // Ends the stream as error happens.
897            }
898        }
899    }
900}
901
902impl<T> Stream for ParquetRecordBatchStream<T>
903where
904    T: AsyncFileReader + Unpin + Send + 'static,
905{
906    type Item = Result<RecordBatch>;
907
908    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
909        loop {
910            match &mut self.state {
911                StreamState::Decoding(batch_reader) => match batch_reader.next() {
912                    Some(Ok(batch)) => {
913                        return Poll::Ready(Some(Ok(batch)));
914                    }
915                    Some(Err(e)) => {
916                        self.state = StreamState::Error;
917                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
918                    }
919                    None => self.state = StreamState::Init,
920                },
921                StreamState::Init => {
922                    let row_group_idx = match self.row_groups.pop_front() {
923                        Some(idx) => idx,
924                        None => return Poll::Ready(None),
925                    };
926
927                    let reader = self.reader_factory.take().expect("lost reader factory");
928
929                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
930
931                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
932
933                    let fut = reader
934                        .read_row_group(
935                            row_group_idx,
936                            selection,
937                            self.projection.clone(),
938                            self.batch_size,
939                        )
940                        .boxed();
941
942                    self.state = StreamState::Reading(fut)
943                }
944                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
945                    Ok((reader_factory, maybe_reader)) => {
946                        self.reader_factory = Some(reader_factory);
947                        match maybe_reader {
948                            // Read records from [`ParquetRecordBatchReader`]
949                            Some(reader) => self.state = StreamState::Decoding(reader),
950                            // All rows skipped, read next row group
951                            None => self.state = StreamState::Init,
952                        }
953                    }
954                    Err(e) => {
955                        self.state = StreamState::Error;
956                        return Poll::Ready(Some(Err(e)));
957                    }
958                },
959                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
960            }
961        }
962    }
963}
964
965/// An in-memory collection of column chunks
966struct InMemoryRowGroup<'a> {
967    offset_index: Option<&'a [OffsetIndexMetaData]>,
968    /// Column chunks for this row group
969    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
970    row_count: usize,
971    row_group_idx: usize,
972    metadata: &'a ParquetMetaData,
973}
974
975impl InMemoryRowGroup<'_> {
976    /// Fetches any additional column data specified in `projection` that is not already
977    /// present in `self.column_chunks`.
978    ///
979    /// If `selection` is provided, only the pages required for the selection
980    /// are fetched. Otherwise, all pages are fetched.
981    async fn fetch<T: AsyncFileReader + Send>(
982        &mut self,
983        input: &mut T,
984        projection: &ProjectionMask,
985        selection: Option<&RowSelection>,
986        batch_size: usize,
987        cache_mask: Option<&ProjectionMask>,
988    ) -> Result<()> {
989        let metadata = self.metadata.row_group(self.row_group_idx);
990        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
991            let expanded_selection =
992                selection.expand_to_batch_boundaries(batch_size, self.row_count);
993            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
994            // `RowSelection`
995            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
996
997            let fetch_ranges = self
998                .column_chunks
999                .iter()
1000                .zip(metadata.columns())
1001                .enumerate()
1002                .filter(|&(idx, (chunk, _chunk_meta))| {
1003                    chunk.is_none() && projection.leaf_included(idx)
1004                })
1005                .flat_map(|(idx, (_chunk, chunk_meta))| {
1006                    // If the first page does not start at the beginning of the column,
1007                    // then we need to also fetch a dictionary page.
1008                    let mut ranges: Vec<Range<u64>> = vec![];
1009                    let (start, _len) = chunk_meta.byte_range();
1010                    match offset_index[idx].page_locations.first() {
1011                        Some(first) if first.offset as u64 != start => {
1012                            ranges.push(start..first.offset as u64);
1013                        }
1014                        _ => (),
1015                    }
1016
1017                    // Expand selection to batch boundaries only for cached columns
1018                    let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1019                    if use_expanded {
1020                        ranges.extend(
1021                            expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1022                        );
1023                    } else {
1024                        ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1025                    }
1026                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1027
1028                    ranges
1029                })
1030                .collect();
1031
1032            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1033            let mut page_start_offsets = page_start_offsets.into_iter();
1034
1035            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1036                if chunk.is_some() || !projection.leaf_included(idx) {
1037                    continue;
1038                }
1039
1040                if let Some(offsets) = page_start_offsets.next() {
1041                    let mut chunks = Vec::with_capacity(offsets.len());
1042                    for _ in 0..offsets.len() {
1043                        chunks.push(chunk_data.next().unwrap());
1044                    }
1045
1046                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1047                        length: metadata.column(idx).byte_range().1 as usize,
1048                        data: offsets
1049                            .into_iter()
1050                            .map(|x| x as usize)
1051                            .zip(chunks.into_iter())
1052                            .collect(),
1053                    }))
1054                }
1055            }
1056        } else {
1057            let fetch_ranges = self
1058                .column_chunks
1059                .iter()
1060                .enumerate()
1061                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1062                .map(|(idx, _chunk)| {
1063                    let column = metadata.column(idx);
1064                    let (start, length) = column.byte_range();
1065                    start..(start + length)
1066                })
1067                .collect();
1068
1069            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1070
1071            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1072                if chunk.is_some() || !projection.leaf_included(idx) {
1073                    continue;
1074                }
1075
1076                if let Some(data) = chunk_data.next() {
1077                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
1078                        offset: metadata.column(idx).byte_range().0 as usize,
1079                        data,
1080                    }));
1081                }
1082            }
1083        }
1084
1085        Ok(())
1086    }
1087}
1088
1089impl RowGroups for InMemoryRowGroup<'_> {
1090    fn num_rows(&self) -> usize {
1091        self.row_count
1092    }
1093
1094    /// Return chunks for column i
1095    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1096        match &self.column_chunks[i] {
1097            None => Err(ParquetError::General(format!(
1098                "Invalid column index {i}, column was not fetched"
1099            ))),
1100            Some(data) => {
1101                let page_locations = self
1102                    .offset_index
1103                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1104                    .filter(|index| !index.is_empty())
1105                    .map(|index| index[i].page_locations.clone());
1106                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1107                let page_reader = SerializedPageReader::new(
1108                    data.clone(),
1109                    column_chunk_metadata,
1110                    self.row_count,
1111                    page_locations,
1112                )?;
1113                let page_reader = page_reader.add_crypto_context(
1114                    self.row_group_idx,
1115                    i,
1116                    self.metadata,
1117                    column_chunk_metadata,
1118                )?;
1119
1120                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1121
1122                Ok(Box::new(ColumnChunkIterator {
1123                    reader: Some(Ok(page_reader)),
1124                }))
1125            }
1126        }
1127    }
1128}
1129
1130/// An in-memory column chunk
1131#[derive(Clone)]
1132enum ColumnChunkData {
1133    /// Column chunk data representing only a subset of data pages
1134    Sparse {
1135        /// Length of the full column chunk
1136        length: usize,
1137        /// Subset of data pages included in this sparse chunk.
1138        ///
1139        /// Each element is a tuple of (page offset within file, page data).
1140        /// Each entry is a complete page and the list is ordered by offset.
1141        data: Vec<(usize, Bytes)>,
1142    },
1143    /// Full column chunk and the offset within the original file
1144    Dense { offset: usize, data: Bytes },
1145}
1146
1147impl ColumnChunkData {
1148    /// Return the data for this column chunk at the given offset
1149    fn get(&self, start: u64) -> Result<Bytes> {
1150        match &self {
1151            ColumnChunkData::Sparse { data, .. } => data
1152                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1153                .map(|idx| data[idx].1.clone())
1154                .map_err(|_| {
1155                    ParquetError::General(format!(
1156                        "Invalid offset in sparse column chunk data: {start}"
1157                    ))
1158                }),
1159            ColumnChunkData::Dense { offset, data } => {
1160                let start = start as usize - *offset;
1161                Ok(data.slice(start..))
1162            }
1163        }
1164    }
1165}
1166
1167impl Length for ColumnChunkData {
1168    /// Return the total length of the full column chunk
1169    fn len(&self) -> u64 {
1170        match &self {
1171            ColumnChunkData::Sparse { length, .. } => *length as u64,
1172            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1173        }
1174    }
1175}
1176
1177impl ChunkReader for ColumnChunkData {
1178    type T = bytes::buf::Reader<Bytes>;
1179
1180    fn get_read(&self, start: u64) -> Result<Self::T> {
1181        Ok(self.get(start)?.reader())
1182    }
1183
1184    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1185        Ok(self.get(start)?.slice(..length))
1186    }
1187}
1188
1189/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1190struct ColumnChunkIterator {
1191    reader: Option<Result<Box<dyn PageReader>>>,
1192}
1193
1194impl Iterator for ColumnChunkIterator {
1195    type Item = Result<Box<dyn PageReader>>;
1196
1197    fn next(&mut self) -> Option<Self::Item> {
1198        self.reader.take()
1199    }
1200}
1201
1202impl PageIterator for ColumnChunkIterator {}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207    use crate::arrow::arrow_reader::{
1208        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1209    };
1210    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1211    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1212    use crate::arrow::ArrowWriter;
1213    use crate::file::metadata::ParquetMetaDataReader;
1214    use crate::file::properties::WriterProperties;
1215    use arrow::compute::kernels::cmp::eq;
1216    use arrow::error::Result as ArrowResult;
1217    use arrow_array::builder::{ListBuilder, StringBuilder};
1218    use arrow_array::cast::AsArray;
1219    use arrow_array::types::Int32Type;
1220    use arrow_array::{
1221        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1222        StructArray, UInt64Array,
1223    };
1224    use arrow_schema::{DataType, Field, Schema};
1225    use futures::{StreamExt, TryStreamExt};
1226    use rand::{rng, Rng};
1227    use std::collections::HashMap;
1228    use std::sync::{Arc, Mutex};
1229    use tempfile::tempfile;
1230
1231    #[derive(Clone)]
1232    struct TestReader {
1233        data: Bytes,
1234        metadata: Option<Arc<ParquetMetaData>>,
1235        requests: Arc<Mutex<Vec<Range<usize>>>>,
1236    }
1237
1238    impl TestReader {
1239        fn new(data: Bytes) -> Self {
1240            Self {
1241                data,
1242                metadata: Default::default(),
1243                requests: Default::default(),
1244            }
1245        }
1246    }
1247
1248    impl AsyncFileReader for TestReader {
1249        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1250            let range = range.clone();
1251            self.requests
1252                .lock()
1253                .unwrap()
1254                .push(range.start as usize..range.end as usize);
1255            futures::future::ready(Ok(self
1256                .data
1257                .slice(range.start as usize..range.end as usize)))
1258            .boxed()
1259        }
1260
1261        fn get_metadata<'a>(
1262            &'a mut self,
1263            options: Option<&'a ArrowReaderOptions>,
1264        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1265            let metadata_reader = ParquetMetaDataReader::new()
1266                .with_page_indexes(options.is_some_and(|o| o.page_index));
1267            self.metadata = Some(Arc::new(
1268                metadata_reader.parse_and_finish(&self.data).unwrap(),
1269            ));
1270            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1271        }
1272    }
1273
1274    #[tokio::test]
1275    async fn test_async_reader() {
1276        let testdata = arrow::util::test_util::parquet_test_data();
1277        let path = format!("{testdata}/alltypes_plain.parquet");
1278        let data = Bytes::from(std::fs::read(path).unwrap());
1279
1280        let async_reader = TestReader::new(data.clone());
1281
1282        let requests = async_reader.requests.clone();
1283        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1284            .await
1285            .unwrap();
1286
1287        let metadata = builder.metadata().clone();
1288        assert_eq!(metadata.num_row_groups(), 1);
1289
1290        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1291        let stream = builder
1292            .with_projection(mask.clone())
1293            .with_batch_size(1024)
1294            .build()
1295            .unwrap();
1296
1297        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1298
1299        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1300            .unwrap()
1301            .with_projection(mask)
1302            .with_batch_size(104)
1303            .build()
1304            .unwrap()
1305            .collect::<ArrowResult<Vec<_>>>()
1306            .unwrap();
1307
1308        assert_eq!(async_batches, sync_batches);
1309
1310        let requests = requests.lock().unwrap();
1311        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1312        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1313
1314        assert_eq!(
1315            &requests[..],
1316            &[
1317                offset_1 as usize..(offset_1 + length_1) as usize,
1318                offset_2 as usize..(offset_2 + length_2) as usize
1319            ]
1320        );
1321    }
1322
1323    #[tokio::test]
1324    async fn test_async_reader_with_next_row_group() {
1325        let testdata = arrow::util::test_util::parquet_test_data();
1326        let path = format!("{testdata}/alltypes_plain.parquet");
1327        let data = Bytes::from(std::fs::read(path).unwrap());
1328
1329        let async_reader = TestReader::new(data.clone());
1330
1331        let requests = async_reader.requests.clone();
1332        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1333            .await
1334            .unwrap();
1335
1336        let metadata = builder.metadata().clone();
1337        assert_eq!(metadata.num_row_groups(), 1);
1338
1339        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1340        let mut stream = builder
1341            .with_projection(mask.clone())
1342            .with_batch_size(1024)
1343            .build()
1344            .unwrap();
1345
1346        let mut readers = vec![];
1347        while let Some(reader) = stream.next_row_group().await.unwrap() {
1348            readers.push(reader);
1349        }
1350
1351        let async_batches: Vec<_> = readers
1352            .into_iter()
1353            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1354            .collect();
1355
1356        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1357            .unwrap()
1358            .with_projection(mask)
1359            .with_batch_size(104)
1360            .build()
1361            .unwrap()
1362            .collect::<ArrowResult<Vec<_>>>()
1363            .unwrap();
1364
1365        assert_eq!(async_batches, sync_batches);
1366
1367        let requests = requests.lock().unwrap();
1368        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1369        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1370
1371        assert_eq!(
1372            &requests[..],
1373            &[
1374                offset_1 as usize..(offset_1 + length_1) as usize,
1375                offset_2 as usize..(offset_2 + length_2) as usize
1376            ]
1377        );
1378    }
1379
1380    #[tokio::test]
1381    async fn test_async_reader_with_index() {
1382        let testdata = arrow::util::test_util::parquet_test_data();
1383        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1384        let data = Bytes::from(std::fs::read(path).unwrap());
1385
1386        let async_reader = TestReader::new(data.clone());
1387
1388        let options = ArrowReaderOptions::new().with_page_index(true);
1389        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1390            .await
1391            .unwrap();
1392
1393        // The builder should have page and offset indexes loaded now
1394        let metadata_with_index = builder.metadata();
1395        assert_eq!(metadata_with_index.num_row_groups(), 1);
1396
1397        // Check offset indexes are present for all columns
1398        let offset_index = metadata_with_index.offset_index().unwrap();
1399        let column_index = metadata_with_index.column_index().unwrap();
1400
1401        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1402        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1403
1404        let num_columns = metadata_with_index
1405            .file_metadata()
1406            .schema_descr()
1407            .num_columns();
1408
1409        // Check page indexes are present for all columns
1410        offset_index
1411            .iter()
1412            .for_each(|x| assert_eq!(x.len(), num_columns));
1413        column_index
1414            .iter()
1415            .for_each(|x| assert_eq!(x.len(), num_columns));
1416
1417        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1418        let stream = builder
1419            .with_projection(mask.clone())
1420            .with_batch_size(1024)
1421            .build()
1422            .unwrap();
1423
1424        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1425
1426        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1427            .unwrap()
1428            .with_projection(mask)
1429            .with_batch_size(1024)
1430            .build()
1431            .unwrap()
1432            .collect::<ArrowResult<Vec<_>>>()
1433            .unwrap();
1434
1435        assert_eq!(async_batches, sync_batches);
1436    }
1437
1438    #[tokio::test]
1439    async fn test_async_reader_with_limit() {
1440        let testdata = arrow::util::test_util::parquet_test_data();
1441        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1442        let data = Bytes::from(std::fs::read(path).unwrap());
1443
1444        let metadata = ParquetMetaDataReader::new()
1445            .parse_and_finish(&data)
1446            .unwrap();
1447        let metadata = Arc::new(metadata);
1448
1449        assert_eq!(metadata.num_row_groups(), 1);
1450
1451        let async_reader = TestReader::new(data.clone());
1452
1453        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1454            .await
1455            .unwrap();
1456
1457        assert_eq!(builder.metadata().num_row_groups(), 1);
1458
1459        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1460        let stream = builder
1461            .with_projection(mask.clone())
1462            .with_batch_size(1024)
1463            .with_limit(1)
1464            .build()
1465            .unwrap();
1466
1467        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1468
1469        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1470            .unwrap()
1471            .with_projection(mask)
1472            .with_batch_size(1024)
1473            .with_limit(1)
1474            .build()
1475            .unwrap()
1476            .collect::<ArrowResult<Vec<_>>>()
1477            .unwrap();
1478
1479        assert_eq!(async_batches, sync_batches);
1480    }
1481
1482    #[tokio::test]
1483    async fn test_async_reader_skip_pages() {
1484        let testdata = arrow::util::test_util::parquet_test_data();
1485        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1486        let data = Bytes::from(std::fs::read(path).unwrap());
1487
1488        let async_reader = TestReader::new(data.clone());
1489
1490        let options = ArrowReaderOptions::new().with_page_index(true);
1491        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1492            .await
1493            .unwrap();
1494
1495        assert_eq!(builder.metadata().num_row_groups(), 1);
1496
1497        let selection = RowSelection::from(vec![
1498            RowSelector::skip(21),   // Skip first page
1499            RowSelector::select(21), // Select page to boundary
1500            RowSelector::skip(41),   // Skip multiple pages
1501            RowSelector::select(41), // Select multiple pages
1502            RowSelector::skip(25),   // Skip page across boundary
1503            RowSelector::select(25), // Select across page boundary
1504            RowSelector::skip(7116), // Skip to final page boundary
1505            RowSelector::select(10), // Select final page
1506        ]);
1507
1508        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1509
1510        let stream = builder
1511            .with_projection(mask.clone())
1512            .with_row_selection(selection.clone())
1513            .build()
1514            .expect("building stream");
1515
1516        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1517
1518        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1519            .unwrap()
1520            .with_projection(mask)
1521            .with_batch_size(1024)
1522            .with_row_selection(selection)
1523            .build()
1524            .unwrap()
1525            .collect::<ArrowResult<Vec<_>>>()
1526            .unwrap();
1527
1528        assert_eq!(async_batches, sync_batches);
1529    }
1530
1531    #[tokio::test]
1532    async fn test_fuzz_async_reader_selection() {
1533        let testdata = arrow::util::test_util::parquet_test_data();
1534        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1535        let data = Bytes::from(std::fs::read(path).unwrap());
1536
1537        let mut rand = rng();
1538
1539        for _ in 0..100 {
1540            let mut expected_rows = 0;
1541            let mut total_rows = 0;
1542            let mut skip = false;
1543            let mut selectors = vec![];
1544
1545            while total_rows < 7300 {
1546                let row_count: usize = rand.random_range(1..100);
1547
1548                let row_count = row_count.min(7300 - total_rows);
1549
1550                selectors.push(RowSelector { row_count, skip });
1551
1552                total_rows += row_count;
1553                if !skip {
1554                    expected_rows += row_count;
1555                }
1556
1557                skip = !skip;
1558            }
1559
1560            let selection = RowSelection::from(selectors);
1561
1562            let async_reader = TestReader::new(data.clone());
1563
1564            let options = ArrowReaderOptions::new().with_page_index(true);
1565            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1566                .await
1567                .unwrap();
1568
1569            assert_eq!(builder.metadata().num_row_groups(), 1);
1570
1571            let col_idx: usize = rand.random_range(0..13);
1572            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1573
1574            let stream = builder
1575                .with_projection(mask.clone())
1576                .with_row_selection(selection.clone())
1577                .build()
1578                .expect("building stream");
1579
1580            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1581
1582            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1583
1584            assert_eq!(actual_rows, expected_rows);
1585        }
1586    }
1587
1588    #[tokio::test]
1589    async fn test_async_reader_zero_row_selector() {
1590        //See https://github.com/apache/arrow-rs/issues/2669
1591        let testdata = arrow::util::test_util::parquet_test_data();
1592        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1593        let data = Bytes::from(std::fs::read(path).unwrap());
1594
1595        let mut rand = rng();
1596
1597        let mut expected_rows = 0;
1598        let mut total_rows = 0;
1599        let mut skip = false;
1600        let mut selectors = vec![];
1601
1602        selectors.push(RowSelector {
1603            row_count: 0,
1604            skip: false,
1605        });
1606
1607        while total_rows < 7300 {
1608            let row_count: usize = rand.random_range(1..100);
1609
1610            let row_count = row_count.min(7300 - total_rows);
1611
1612            selectors.push(RowSelector { row_count, skip });
1613
1614            total_rows += row_count;
1615            if !skip {
1616                expected_rows += row_count;
1617            }
1618
1619            skip = !skip;
1620        }
1621
1622        let selection = RowSelection::from(selectors);
1623
1624        let async_reader = TestReader::new(data.clone());
1625
1626        let options = ArrowReaderOptions::new().with_page_index(true);
1627        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1628            .await
1629            .unwrap();
1630
1631        assert_eq!(builder.metadata().num_row_groups(), 1);
1632
1633        let col_idx: usize = rand.random_range(0..13);
1634        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1635
1636        let stream = builder
1637            .with_projection(mask.clone())
1638            .with_row_selection(selection.clone())
1639            .build()
1640            .expect("building stream");
1641
1642        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1643
1644        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1645
1646        assert_eq!(actual_rows, expected_rows);
1647    }
1648
1649    #[tokio::test]
1650    async fn test_row_filter() {
1651        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1652        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1653        let data = RecordBatch::try_from_iter([
1654            ("a", Arc::new(a) as ArrayRef),
1655            ("b", Arc::new(b) as ArrayRef),
1656        ])
1657        .unwrap();
1658
1659        let mut buf = Vec::with_capacity(1024);
1660        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1661        writer.write(&data).unwrap();
1662        writer.close().unwrap();
1663
1664        let data: Bytes = buf.into();
1665        let metadata = ParquetMetaDataReader::new()
1666            .parse_and_finish(&data)
1667            .unwrap();
1668        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1669
1670        let test = TestReader::new(data);
1671        let requests = test.requests.clone();
1672
1673        let a_scalar = StringArray::from_iter_values(["b"]);
1674        let a_filter = ArrowPredicateFn::new(
1675            ProjectionMask::leaves(&parquet_schema, vec![0]),
1676            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1677        );
1678
1679        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1680
1681        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1682        let stream = ParquetRecordBatchStreamBuilder::new(test)
1683            .await
1684            .unwrap()
1685            .with_projection(mask.clone())
1686            .with_batch_size(1024)
1687            .with_row_filter(filter)
1688            .build()
1689            .unwrap();
1690
1691        let batches: Vec<_> = stream.try_collect().await.unwrap();
1692        assert_eq!(batches.len(), 1);
1693
1694        let batch = &batches[0];
1695        assert_eq!(batch.num_columns(), 2);
1696
1697        // Filter should have kept only rows with "b" in column 0
1698        assert_eq!(
1699            batch.column(0).as_ref(),
1700            &StringArray::from_iter_values(["b", "b", "b"])
1701        );
1702        assert_eq!(
1703            batch.column(1).as_ref(),
1704            &StringArray::from_iter_values(["2", "3", "4"])
1705        );
1706
1707        // Should only have made 2 requests:
1708        // * First request fetches data for evaluating the predicate
1709        // * Second request fetches data for evaluating the projection
1710        assert_eq!(requests.lock().unwrap().len(), 2);
1711    }
1712
1713    #[tokio::test]
1714    async fn test_two_row_filters() {
1715        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1716        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1717        let c = Int32Array::from_iter(0..6);
1718        let data = RecordBatch::try_from_iter([
1719            ("a", Arc::new(a) as ArrayRef),
1720            ("b", Arc::new(b) as ArrayRef),
1721            ("c", Arc::new(c) as ArrayRef),
1722        ])
1723        .unwrap();
1724
1725        let mut buf = Vec::with_capacity(1024);
1726        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1727        writer.write(&data).unwrap();
1728        writer.close().unwrap();
1729
1730        let data: Bytes = buf.into();
1731        let metadata = ParquetMetaDataReader::new()
1732            .parse_and_finish(&data)
1733            .unwrap();
1734        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1735
1736        let test = TestReader::new(data);
1737        let requests = test.requests.clone();
1738
1739        let a_scalar = StringArray::from_iter_values(["b"]);
1740        let a_filter = ArrowPredicateFn::new(
1741            ProjectionMask::leaves(&parquet_schema, vec![0]),
1742            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1743        );
1744
1745        let b_scalar = StringArray::from_iter_values(["4"]);
1746        let b_filter = ArrowPredicateFn::new(
1747            ProjectionMask::leaves(&parquet_schema, vec![1]),
1748            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1749        );
1750
1751        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1752
1753        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1754        let stream = ParquetRecordBatchStreamBuilder::new(test)
1755            .await
1756            .unwrap()
1757            .with_projection(mask.clone())
1758            .with_batch_size(1024)
1759            .with_row_filter(filter)
1760            .build()
1761            .unwrap();
1762
1763        let batches: Vec<_> = stream.try_collect().await.unwrap();
1764        assert_eq!(batches.len(), 1);
1765
1766        let batch = &batches[0];
1767        assert_eq!(batch.num_rows(), 1);
1768        assert_eq!(batch.num_columns(), 2);
1769
1770        let col = batch.column(0);
1771        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1772        assert_eq!(val, "b");
1773
1774        let col = batch.column(1);
1775        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1776        assert_eq!(val, 3);
1777
1778        // Should only have made 3 requests
1779        // * First request fetches data for evaluating the first predicate
1780        // * Second request fetches data for evaluating the second predicate
1781        // * Third request fetches data for evaluating the projection
1782        assert_eq!(requests.lock().unwrap().len(), 3);
1783    }
1784
1785    #[tokio::test]
1786    async fn test_limit_multiple_row_groups() {
1787        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1788        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1789        let c = Int32Array::from_iter(0..6);
1790        let data = RecordBatch::try_from_iter([
1791            ("a", Arc::new(a) as ArrayRef),
1792            ("b", Arc::new(b) as ArrayRef),
1793            ("c", Arc::new(c) as ArrayRef),
1794        ])
1795        .unwrap();
1796
1797        let mut buf = Vec::with_capacity(1024);
1798        let props = WriterProperties::builder()
1799            .set_max_row_group_size(3)
1800            .build();
1801        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1802        writer.write(&data).unwrap();
1803        writer.close().unwrap();
1804
1805        let data: Bytes = buf.into();
1806        let metadata = ParquetMetaDataReader::new()
1807            .parse_and_finish(&data)
1808            .unwrap();
1809
1810        assert_eq!(metadata.num_row_groups(), 2);
1811
1812        let test = TestReader::new(data);
1813
1814        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1815            .await
1816            .unwrap()
1817            .with_batch_size(1024)
1818            .with_limit(4)
1819            .build()
1820            .unwrap();
1821
1822        let batches: Vec<_> = stream.try_collect().await.unwrap();
1823        // Expect one batch for each row group
1824        assert_eq!(batches.len(), 2);
1825
1826        let batch = &batches[0];
1827        // First batch should contain all rows
1828        assert_eq!(batch.num_rows(), 3);
1829        assert_eq!(batch.num_columns(), 3);
1830        let col2 = batch.column(2).as_primitive::<Int32Type>();
1831        assert_eq!(col2.values(), &[0, 1, 2]);
1832
1833        let batch = &batches[1];
1834        // Second batch should trigger the limit and only have one row
1835        assert_eq!(batch.num_rows(), 1);
1836        assert_eq!(batch.num_columns(), 3);
1837        let col2 = batch.column(2).as_primitive::<Int32Type>();
1838        assert_eq!(col2.values(), &[3]);
1839
1840        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1841            .await
1842            .unwrap()
1843            .with_offset(2)
1844            .with_limit(3)
1845            .build()
1846            .unwrap();
1847
1848        let batches: Vec<_> = stream.try_collect().await.unwrap();
1849        // Expect one batch for each row group
1850        assert_eq!(batches.len(), 2);
1851
1852        let batch = &batches[0];
1853        // First batch should contain one row
1854        assert_eq!(batch.num_rows(), 1);
1855        assert_eq!(batch.num_columns(), 3);
1856        let col2 = batch.column(2).as_primitive::<Int32Type>();
1857        assert_eq!(col2.values(), &[2]);
1858
1859        let batch = &batches[1];
1860        // Second batch should contain two rows
1861        assert_eq!(batch.num_rows(), 2);
1862        assert_eq!(batch.num_columns(), 3);
1863        let col2 = batch.column(2).as_primitive::<Int32Type>();
1864        assert_eq!(col2.values(), &[3, 4]);
1865
1866        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1867            .await
1868            .unwrap()
1869            .with_offset(4)
1870            .with_limit(20)
1871            .build()
1872            .unwrap();
1873
1874        let batches: Vec<_> = stream.try_collect().await.unwrap();
1875        // Should skip first row group
1876        assert_eq!(batches.len(), 1);
1877
1878        let batch = &batches[0];
1879        // First batch should contain two rows
1880        assert_eq!(batch.num_rows(), 2);
1881        assert_eq!(batch.num_columns(), 3);
1882        let col2 = batch.column(2).as_primitive::<Int32Type>();
1883        assert_eq!(col2.values(), &[4, 5]);
1884    }
1885
1886    #[tokio::test]
1887    async fn test_row_filter_with_index() {
1888        let testdata = arrow::util::test_util::parquet_test_data();
1889        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1890        let data = Bytes::from(std::fs::read(path).unwrap());
1891
1892        let metadata = ParquetMetaDataReader::new()
1893            .parse_and_finish(&data)
1894            .unwrap();
1895        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1896
1897        assert_eq!(metadata.num_row_groups(), 1);
1898
1899        let async_reader = TestReader::new(data.clone());
1900
1901        let a_filter =
1902            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1903                Ok(batch.column(0).as_boolean().clone())
1904            });
1905
1906        let b_scalar = Int8Array::from(vec![2]);
1907        let b_filter = ArrowPredicateFn::new(
1908            ProjectionMask::leaves(&parquet_schema, vec![2]),
1909            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1910        );
1911
1912        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1913
1914        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1915
1916        let options = ArrowReaderOptions::new().with_page_index(true);
1917        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1918            .await
1919            .unwrap()
1920            .with_projection(mask.clone())
1921            .with_batch_size(1024)
1922            .with_row_filter(filter)
1923            .build()
1924            .unwrap();
1925
1926        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1927
1928        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1929
1930        assert_eq!(total_rows, 730);
1931    }
1932
1933    #[tokio::test]
1934    async fn test_in_memory_row_group_sparse() {
1935        let testdata = arrow::util::test_util::parquet_test_data();
1936        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1937        let data = Bytes::from(std::fs::read(path).unwrap());
1938
1939        let metadata = ParquetMetaDataReader::new()
1940            .with_page_indexes(true)
1941            .parse_and_finish(&data)
1942            .unwrap();
1943
1944        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1945
1946        let mut metadata_builder = metadata.into_builder();
1947        let mut row_groups = metadata_builder.take_row_groups();
1948        row_groups.truncate(1);
1949        let row_group_meta = row_groups.pop().unwrap();
1950
1951        let metadata = metadata_builder
1952            .add_row_group(row_group_meta)
1953            .set_column_index(None)
1954            .set_offset_index(Some(vec![offset_index.clone()]))
1955            .build();
1956
1957        let metadata = Arc::new(metadata);
1958
1959        let num_rows = metadata.row_group(0).num_rows();
1960
1961        assert_eq!(metadata.num_row_groups(), 1);
1962
1963        let async_reader = TestReader::new(data.clone());
1964
1965        let requests = async_reader.requests.clone();
1966        let (_, fields) = parquet_to_arrow_schema_and_fields(
1967            metadata.file_metadata().schema_descr(),
1968            ProjectionMask::all(),
1969            None,
1970        )
1971        .unwrap();
1972
1973        let _schema_desc = metadata.file_metadata().schema_descr();
1974
1975        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1976
1977        let reader_factory = ReaderFactory {
1978            metadata,
1979            fields: fields.map(Arc::new),
1980            input: async_reader,
1981            filter: None,
1982            limit: None,
1983            offset: None,
1984            metrics: ArrowReaderMetrics::disabled(),
1985            max_predicate_cache_size: 0,
1986        };
1987
1988        let mut skip = true;
1989        let mut pages = offset_index[0].page_locations.iter().peekable();
1990
1991        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1992        let mut selectors = vec![];
1993        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1994        while let Some(page) = pages.next() {
1995            let num_rows = if let Some(next_page) = pages.peek() {
1996                next_page.first_row_index - page.first_row_index
1997            } else {
1998                num_rows - page.first_row_index
1999            };
2000
2001            if skip {
2002                selectors.push(RowSelector::skip(num_rows as usize));
2003            } else {
2004                selectors.push(RowSelector::select(num_rows as usize));
2005                let start = page.offset as usize;
2006                let end = start + page.compressed_page_size as usize;
2007                expected_page_requests.push(start..end);
2008            }
2009            skip = !skip;
2010        }
2011
2012        let selection = RowSelection::from(selectors);
2013
2014        let (_factory, _reader) = reader_factory
2015            .read_row_group(0, Some(selection), projection.clone(), 48)
2016            .await
2017            .expect("reading row group");
2018
2019        let requests = requests.lock().unwrap();
2020
2021        assert_eq!(&requests[..], &expected_page_requests)
2022    }
2023
2024    #[tokio::test]
2025    async fn test_batch_size_overallocate() {
2026        let testdata = arrow::util::test_util::parquet_test_data();
2027        // `alltypes_plain.parquet` only have 8 rows
2028        let path = format!("{testdata}/alltypes_plain.parquet");
2029        let data = Bytes::from(std::fs::read(path).unwrap());
2030
2031        let async_reader = TestReader::new(data.clone());
2032
2033        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2034            .await
2035            .unwrap();
2036
2037        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2038
2039        let stream = builder
2040            .with_projection(ProjectionMask::all())
2041            .with_batch_size(1024)
2042            .build()
2043            .unwrap();
2044        assert_ne!(1024, file_rows);
2045        assert_eq!(stream.batch_size, file_rows);
2046    }
2047
2048    #[tokio::test]
2049    async fn test_get_row_group_column_bloom_filter_without_length() {
2050        let testdata = arrow::util::test_util::parquet_test_data();
2051        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2052        let data = Bytes::from(std::fs::read(path).unwrap());
2053        test_get_row_group_column_bloom_filter(data, false).await;
2054    }
2055
2056    #[tokio::test]
2057    async fn test_parquet_record_batch_stream_schema() {
2058        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2059            schema.flattened_fields().iter().map(|f| f.name()).collect()
2060        }
2061
2062        // ParquetRecordBatchReaderBuilder::schema differs from
2063        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
2064        // schema contents (in terms of custom metadata attached to schema, and fields
2065        // returned). Test to ensure this remains consistent behaviour.
2066        //
2067        // Ensure same for asynchronous versions of the above.
2068
2069        // Prep data, for a schema with nested fields, with custom metadata
2070        let mut metadata = HashMap::with_capacity(1);
2071        metadata.insert("key".to_string(), "value".to_string());
2072
2073        let nested_struct_array = StructArray::from(vec![
2074            (
2075                Arc::new(Field::new("d", DataType::Utf8, true)),
2076                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2077            ),
2078            (
2079                Arc::new(Field::new("e", DataType::Utf8, true)),
2080                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2081            ),
2082        ]);
2083        let struct_array = StructArray::from(vec![
2084            (
2085                Arc::new(Field::new("a", DataType::Int32, true)),
2086                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2087            ),
2088            (
2089                Arc::new(Field::new("b", DataType::UInt64, true)),
2090                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2091            ),
2092            (
2093                Arc::new(Field::new(
2094                    "c",
2095                    nested_struct_array.data_type().clone(),
2096                    true,
2097                )),
2098                Arc::new(nested_struct_array) as ArrayRef,
2099            ),
2100        ]);
2101
2102        let schema =
2103            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2104        let record_batch = RecordBatch::from(struct_array)
2105            .with_schema(schema.clone())
2106            .unwrap();
2107
2108        // Write parquet with custom metadata in schema
2109        let mut file = tempfile().unwrap();
2110        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2111        writer.write(&record_batch).unwrap();
2112        writer.close().unwrap();
2113
2114        let all_fields = ["a", "b", "c", "d", "e"];
2115        // (leaf indices in mask, expected names in output schema all fields)
2116        let projections = [
2117            (vec![], vec![]),
2118            (vec![0], vec!["a"]),
2119            (vec![0, 1], vec!["a", "b"]),
2120            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2121            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2122        ];
2123
2124        // Ensure we're consistent for each of these projections
2125        for (indices, expected_projected_names) in projections {
2126            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2127                // Builder schema should preserve all fields and metadata
2128                assert_eq!(get_all_field_names(&builder), all_fields);
2129                assert_eq!(builder.metadata, metadata);
2130                // Reader & batch schema should show only projected fields, and no metadata
2131                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2132                assert_eq!(reader.metadata, HashMap::default());
2133                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2134                assert_eq!(batch.metadata, HashMap::default());
2135            };
2136
2137            let builder =
2138                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2139            let sync_builder_schema = builder.schema().clone();
2140            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2141            let mut reader = builder.with_projection(mask).build().unwrap();
2142            let sync_reader_schema = reader.schema();
2143            let batch = reader.next().unwrap().unwrap();
2144            let sync_batch_schema = batch.schema();
2145            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2146
2147            // asynchronous should be same
2148            let file = tokio::fs::File::from(file.try_clone().unwrap());
2149            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2150            let async_builder_schema = builder.schema().clone();
2151            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2152            let mut reader = builder.with_projection(mask).build().unwrap();
2153            let async_reader_schema = reader.schema().clone();
2154            let batch = reader.next().await.unwrap().unwrap();
2155            let async_batch_schema = batch.schema();
2156            assert_schemas(
2157                async_builder_schema,
2158                async_reader_schema,
2159                async_batch_schema,
2160            );
2161        }
2162    }
2163
2164    #[tokio::test]
2165    async fn test_get_row_group_column_bloom_filter_with_length() {
2166        // convert to new parquet file with bloom_filter_length
2167        let testdata = arrow::util::test_util::parquet_test_data();
2168        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2169        let data = Bytes::from(std::fs::read(path).unwrap());
2170        let async_reader = TestReader::new(data.clone());
2171        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2172            .await
2173            .unwrap();
2174        let schema = builder.schema().clone();
2175        let stream = builder.build().unwrap();
2176        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2177
2178        let mut parquet_data = Vec::new();
2179        let props = WriterProperties::builder()
2180            .set_bloom_filter_enabled(true)
2181            .build();
2182        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2183        for batch in batches {
2184            writer.write(&batch).unwrap();
2185        }
2186        writer.close().unwrap();
2187
2188        // test the new parquet file
2189        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2190    }
2191
2192    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2193        let async_reader = TestReader::new(data.clone());
2194
2195        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2196            .await
2197            .unwrap();
2198
2199        let metadata = builder.metadata();
2200        assert_eq!(metadata.num_row_groups(), 1);
2201        let row_group = metadata.row_group(0);
2202        let column = row_group.column(0);
2203        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2204
2205        let sbbf = builder
2206            .get_row_group_column_bloom_filter(0, 0)
2207            .await
2208            .unwrap()
2209            .unwrap();
2210        assert!(sbbf.check(&"Hello"));
2211        assert!(!sbbf.check(&"Hello_Not_Exists"));
2212    }
2213
2214    #[tokio::test]
2215    async fn test_nested_skip() {
2216        let schema = Arc::new(Schema::new(vec![
2217            Field::new("col_1", DataType::UInt64, false),
2218            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2219        ]));
2220
2221        // Default writer properties
2222        let props = WriterProperties::builder()
2223            .set_data_page_row_count_limit(256)
2224            .set_write_batch_size(256)
2225            .set_max_row_group_size(1024);
2226
2227        // Write data
2228        let mut file = tempfile().unwrap();
2229        let mut writer =
2230            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2231
2232        let mut builder = ListBuilder::new(StringBuilder::new());
2233        for id in 0..1024 {
2234            match id % 3 {
2235                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2236                1 => builder.append_value([Some(format!("id_{id}"))]),
2237                _ => builder.append_null(),
2238            }
2239        }
2240        let refs = vec![
2241            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2242            Arc::new(builder.finish()) as ArrayRef,
2243        ];
2244
2245        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2246        writer.write(&batch).unwrap();
2247        writer.close().unwrap();
2248
2249        let selections = [
2250            RowSelection::from(vec![
2251                RowSelector::skip(313),
2252                RowSelector::select(1),
2253                RowSelector::skip(709),
2254                RowSelector::select(1),
2255            ]),
2256            RowSelection::from(vec![
2257                RowSelector::skip(255),
2258                RowSelector::select(1),
2259                RowSelector::skip(767),
2260                RowSelector::select(1),
2261            ]),
2262            RowSelection::from(vec![
2263                RowSelector::select(255),
2264                RowSelector::skip(1),
2265                RowSelector::select(767),
2266                RowSelector::skip(1),
2267            ]),
2268            RowSelection::from(vec![
2269                RowSelector::skip(254),
2270                RowSelector::select(1),
2271                RowSelector::select(1),
2272                RowSelector::skip(767),
2273                RowSelector::select(1),
2274            ]),
2275        ];
2276
2277        for selection in selections {
2278            let expected = selection.row_count();
2279            // Read data
2280            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2281                tokio::fs::File::from_std(file.try_clone().unwrap()),
2282                ArrowReaderOptions::new().with_page_index(true),
2283            )
2284            .await
2285            .unwrap();
2286
2287            reader = reader.with_row_selection(selection);
2288
2289            let mut stream = reader.build().unwrap();
2290
2291            let mut total_rows = 0;
2292            while let Some(rb) = stream.next().await {
2293                let rb = rb.unwrap();
2294                total_rows += rb.num_rows();
2295            }
2296            assert_eq!(total_rows, expected);
2297        }
2298    }
2299
2300    #[tokio::test]
2301    async fn test_row_filter_nested() {
2302        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2303        let b = StructArray::from(vec![
2304            (
2305                Arc::new(Field::new("aa", DataType::Utf8, true)),
2306                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2307            ),
2308            (
2309                Arc::new(Field::new("bb", DataType::Utf8, true)),
2310                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2311            ),
2312        ]);
2313        let c = Int32Array::from_iter(0..6);
2314        let data = RecordBatch::try_from_iter([
2315            ("a", Arc::new(a) as ArrayRef),
2316            ("b", Arc::new(b) as ArrayRef),
2317            ("c", Arc::new(c) as ArrayRef),
2318        ])
2319        .unwrap();
2320
2321        let mut buf = Vec::with_capacity(1024);
2322        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2323        writer.write(&data).unwrap();
2324        writer.close().unwrap();
2325
2326        let data: Bytes = buf.into();
2327        let metadata = ParquetMetaDataReader::new()
2328            .parse_and_finish(&data)
2329            .unwrap();
2330        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2331
2332        let test = TestReader::new(data);
2333        let requests = test.requests.clone();
2334
2335        let a_scalar = StringArray::from_iter_values(["b"]);
2336        let a_filter = ArrowPredicateFn::new(
2337            ProjectionMask::leaves(&parquet_schema, vec![0]),
2338            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2339        );
2340
2341        let b_scalar = StringArray::from_iter_values(["4"]);
2342        let b_filter = ArrowPredicateFn::new(
2343            ProjectionMask::leaves(&parquet_schema, vec![2]),
2344            move |batch| {
2345                // Filter on the second element of the struct.
2346                let struct_array = batch
2347                    .column(0)
2348                    .as_any()
2349                    .downcast_ref::<StructArray>()
2350                    .unwrap();
2351                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2352            },
2353        );
2354
2355        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2356
2357        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2358        let stream = ParquetRecordBatchStreamBuilder::new(test)
2359            .await
2360            .unwrap()
2361            .with_projection(mask.clone())
2362            .with_batch_size(1024)
2363            .with_row_filter(filter)
2364            .build()
2365            .unwrap();
2366
2367        let batches: Vec<_> = stream.try_collect().await.unwrap();
2368        assert_eq!(batches.len(), 1);
2369
2370        let batch = &batches[0];
2371        assert_eq!(batch.num_rows(), 1);
2372        assert_eq!(batch.num_columns(), 2);
2373
2374        let col = batch.column(0);
2375        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2376        assert_eq!(val, "b");
2377
2378        let col = batch.column(1);
2379        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2380        assert_eq!(val, 3);
2381
2382        // Should only have made 3 requests
2383        // * First request fetches data for evaluating the first predicate
2384        // * Second request fetches data for evaluating the second predicate
2385        // * Third request fetches data for evaluating the projection
2386        assert_eq!(requests.lock().unwrap().len(), 3);
2387    }
2388
2389    #[tokio::test]
2390    async fn test_cache_projection_excludes_nested_columns() {
2391        use arrow_array::{ArrayRef, StringArray};
2392
2393        // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
2394        let a = StringArray::from_iter_values(["r1", "r2"]);
2395        let b = StructArray::from(vec![
2396            (
2397                Arc::new(Field::new("aa", DataType::Utf8, true)),
2398                Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2399            ),
2400            (
2401                Arc::new(Field::new("bb", DataType::Utf8, true)),
2402                Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2403            ),
2404        ]);
2405
2406        let schema = Arc::new(Schema::new(vec![
2407            Field::new("a", DataType::Utf8, true),
2408            Field::new("b", b.data_type().clone(), true),
2409        ]));
2410
2411        let mut buf = Vec::new();
2412        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2413        let batch = RecordBatch::try_from_iter([
2414            ("a", Arc::new(a) as ArrayRef),
2415            ("b", Arc::new(b) as ArrayRef),
2416        ])
2417        .unwrap();
2418        writer.write(&batch).unwrap();
2419        writer.close().unwrap();
2420
2421        // Load Parquet metadata
2422        let data: Bytes = buf.into();
2423        let metadata = ParquetMetaDataReader::new()
2424            .parse_and_finish(&data)
2425            .unwrap();
2426        let metadata = Arc::new(metadata);
2427
2428        // Build a RowFilter whose predicate projects a leaf under the nested root `b`
2429        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
2430        let parquet_schema = metadata.file_metadata().schema_descr();
2431        let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2432
2433        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2434            Ok(arrow_array::BooleanArray::from(vec![
2435                true;
2436                batch.num_rows()
2437            ]))
2438        });
2439        let filter = RowFilter::new(vec![Box::new(always_true)]);
2440
2441        // Construct a ReaderFactory and compute cache projection
2442        let reader_factory = ReaderFactory {
2443            metadata: Arc::clone(&metadata),
2444            fields: None,
2445            input: TestReader::new(data),
2446            filter: Some(filter),
2447            limit: None,
2448            offset: None,
2449            metrics: ArrowReaderMetrics::disabled(),
2450            max_predicate_cache_size: 0,
2451        };
2452
2453        // Provide an output projection that also selects the same nested leaf
2454        let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2455
2456        // Expect None since nested columns should be excluded from cache projection
2457        assert!(cache_projection.is_none());
2458    }
2459
2460    #[tokio::test]
2461    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2462        use tokio::fs::File;
2463        let testdata = arrow::util::test_util::parquet_test_data();
2464        let path = format!("{testdata}/alltypes_plain.parquet");
2465        let mut file = File::open(&path).await.unwrap();
2466        let file_size = file.metadata().await.unwrap().len();
2467        let mut metadata = ParquetMetaDataReader::new()
2468            .with_page_indexes(true)
2469            .load_and_finish(&mut file, file_size)
2470            .await
2471            .unwrap();
2472
2473        metadata.set_offset_index(Some(vec![]));
2474        let options = ArrowReaderOptions::new().with_page_index(true);
2475        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2476        let reader =
2477            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2478                .build()
2479                .unwrap();
2480
2481        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2482        assert_eq!(result.len(), 1);
2483    }
2484
2485    #[tokio::test]
2486    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2487        use tokio::fs::File;
2488        let testdata = arrow::util::test_util::parquet_test_data();
2489        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2490        let mut file = File::open(&path).await.unwrap();
2491        let file_size = file.metadata().await.unwrap().len();
2492        let metadata = ParquetMetaDataReader::new()
2493            .with_page_indexes(true)
2494            .load_and_finish(&mut file, file_size)
2495            .await
2496            .unwrap();
2497
2498        let options = ArrowReaderOptions::new().with_page_index(true);
2499        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2500        let reader =
2501            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2502                .build()
2503                .unwrap();
2504
2505        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2506        assert_eq!(result.len(), 8);
2507    }
2508
2509    #[tokio::test]
2510    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2511        use tempfile::TempDir;
2512        use tokio::fs::File;
2513        fn write_metadata_to_local_file(
2514            metadata: ParquetMetaData,
2515            file: impl AsRef<std::path::Path>,
2516        ) {
2517            use crate::file::metadata::ParquetMetaDataWriter;
2518            use std::fs::File;
2519            let file = File::create(file).unwrap();
2520            ParquetMetaDataWriter::new(file, &metadata)
2521                .finish()
2522                .unwrap()
2523        }
2524
2525        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2526            use std::fs::File;
2527            let file = File::open(file).unwrap();
2528            ParquetMetaDataReader::new()
2529                .with_page_indexes(true)
2530                .parse_and_finish(&file)
2531                .unwrap()
2532        }
2533
2534        let testdata = arrow::util::test_util::parquet_test_data();
2535        let path = format!("{testdata}/alltypes_plain.parquet");
2536        let mut file = File::open(&path).await.unwrap();
2537        let file_size = file.metadata().await.unwrap().len();
2538        let metadata = ParquetMetaDataReader::new()
2539            .with_page_indexes(true)
2540            .load_and_finish(&mut file, file_size)
2541            .await
2542            .unwrap();
2543
2544        let tempdir = TempDir::new().unwrap();
2545        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2546        write_metadata_to_local_file(metadata, &metadata_path);
2547        let metadata = read_metadata_from_local_file(&metadata_path);
2548
2549        let options = ArrowReaderOptions::new().with_page_index(true);
2550        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2551        let reader =
2552            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2553                .build()
2554                .unwrap();
2555
2556        // Panics here
2557        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2558        assert_eq!(result.len(), 1);
2559    }
2560
2561    #[tokio::test]
2562    async fn test_cached_array_reader_sparse_offset_error() {
2563        use futures::TryStreamExt;
2564
2565        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2566        use arrow_array::{BooleanArray, RecordBatch};
2567
2568        let testdata = arrow::util::test_util::parquet_test_data();
2569        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2570        let data = Bytes::from(std::fs::read(path).unwrap());
2571
2572        let async_reader = TestReader::new(data);
2573
2574        // Enable page index so the fetch logic loads only required pages
2575        let options = ArrowReaderOptions::new().with_page_index(true);
2576        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2577            .await
2578            .unwrap();
2579
2580        // Skip the first 22 rows (entire first Parquet page) and then select the
2581        // next 3 rows (22, 23, 24). This means the fetch step will not include
2582        // the first page starting at file offset 0.
2583        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2584
2585        // Trivial predicate on column 0 that always returns `true`. Using the
2586        // same column in both predicate and projection activates the caching
2587        // layer (Producer/Consumer pattern).
2588        let parquet_schema = builder.parquet_schema();
2589        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2590        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2591            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2592        });
2593        let filter = RowFilter::new(vec![Box::new(always_true)]);
2594
2595        // Build the stream with batch size 8 so the cache reads whole batches
2596        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2597        let stream = builder
2598            .with_batch_size(8)
2599            .with_projection(proj)
2600            .with_row_selection(selection)
2601            .with_row_filter(filter)
2602            .build()
2603            .unwrap();
2604
2605        // Collecting the stream should fail with the sparse column chunk offset
2606        // error we want to reproduce.
2607        let _result: Vec<_> = stream.try_collect().await.unwrap();
2608    }
2609}