parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::Bytes;
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::arrow_reader::{
42    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
43    RowFilter, RowSelection,
44};
45
46use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
47use crate::bloom_filter::{
48    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
49};
50use crate::errors::{ParquetError, Result};
51use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
52
53mod metadata;
54pub use metadata::*;
55
56#[cfg(feature = "object_store")]
57mod store;
58
59use crate::arrow::ProjectionMask;
60use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache};
61use crate::arrow::arrow_reader::ReadPlanBuilder;
62use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
63use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
69///
70/// Notes:
71///
72/// 1. There is a default implementation for types that implement [`AsyncRead`]
73///    and [`AsyncSeek`], for example [`tokio::fs::File`].
74///
75/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
76///    is enabled, implements this interface for [`ObjectStore`].
77///
78/// [`ObjectStore`]: object_store::ObjectStore
79///
80/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
81pub trait AsyncFileReader: Send {
82    /// Retrieve the bytes in `range`
83    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
86    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87        async move {
88            let mut result = Vec::with_capacity(ranges.len());
89
90            for range in ranges.into_iter() {
91                let data = self.get_bytes(range).await?;
92                result.push(data);
93            }
94
95            Ok(result)
96        }
97        .boxed()
98    }
99
100    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
101    ///
102    /// This is an asynchronous operation as it may involve reading the file
103    /// footer and potentially other metadata from disk or a remote source.
104    ///
105    /// Reading data from Parquet requires the metadata to understand the
106    /// schema, row groups, and location of pages within the file. This metadata
107    /// is stored primarily in the footer of the Parquet file, and can be read using
108    /// [`ParquetMetaDataReader`].
109    ///
110    /// However, implementations can significantly speed up reading Parquet by
111    /// supplying cached metadata or pre-fetched metadata via this API.
112    ///
113    /// # Parameters
114    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
115    ///   and other options that affect how the metadata is read.
116    fn get_metadata<'a>(
117        &'a mut self,
118        options: Option<&'a ArrowReaderOptions>,
119    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
123impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125        self.as_mut().get_bytes(range)
126    }
127
128    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129        self.as_mut().get_byte_ranges(ranges)
130    }
131
132    fn get_metadata<'a>(
133        &'a mut self,
134        options: Option<&'a ArrowReaderOptions>,
135    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136        self.as_mut().get_metadata(options)
137    }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142        async move {
143            self.seek(SeekFrom::End(-(suffix as i64))).await?;
144            let mut buf = Vec::with_capacity(suffix);
145            self.take(suffix as _).read_to_end(&mut buf).await?;
146            Ok(buf.into())
147        }
148        .boxed()
149    }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154        async move {
155            self.seek(SeekFrom::Start(range.start)).await?;
156
157            let to_read = range.end - range.start;
158            let mut buffer = Vec::with_capacity(to_read.try_into()?);
159            let read = self.take(to_read).read_to_end(&mut buffer).await?;
160            if read as u64 != to_read {
161                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162            }
163
164            Ok(buffer.into())
165        }
166        .boxed()
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173        async move {
174            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
175                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
176            );
177
178            #[cfg(feature = "encryption")]
179            let metadata_reader = metadata_reader.with_decryption_properties(
180                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
181            );
182
183            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
184            Ok(Arc::new(parquet_metadata))
185        }
186        .boxed()
187    }
188}
189
190impl ArrowReaderMetadata {
191    /// Returns a new [`ArrowReaderMetadata`] for this builder
192    ///
193    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
194    pub async fn load_async<T: AsyncFileReader>(
195        input: &mut T,
196        options: ArrowReaderOptions,
197    ) -> Result<Self> {
198        let metadata = input.get_metadata(Some(&options)).await?;
199        Self::try_new(metadata, options)
200    }
201}
202
203#[doc(hidden)]
204// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
205//
206// Allows sharing the same builder for both the sync and async versions, whilst also not
207// breaking the pre-existing ParquetRecordBatchStreamBuilder API
208pub struct AsyncReader<T>(T);
209
210/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
211///
212/// This can be used to decode a Parquet file in streaming fashion (without
213/// downloading the whole file at once) from a remote source, such as an object store.
214///
215/// This builder handles reading the parquet file metadata, allowing consumers
216/// to use this information to select what specific columns, row groups, etc.
217/// they wish to be read by the resulting stream.
218///
219/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
220///
221/// See [`ArrowReaderBuilder`] for additional member functions
222pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
223
224impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
225    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
226    /// specified source.
227    ///
228    /// # Example
229    /// ```
230    /// # #[tokio::main(flavor="current_thread")]
231    /// # async fn main() {
232    /// #
233    /// # use arrow_array::RecordBatch;
234    /// # use arrow::util::pretty::pretty_format_batches;
235    /// # use futures::TryStreamExt;
236    /// #
237    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
238    /// #
239    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
240    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
241    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
242    /// #     assert_eq!(
243    /// #          &actual_lines, expected_lines,
244    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
245    /// #          expected_lines, actual_lines
246    /// #      );
247    /// #  }
248    /// #
249    /// # let testdata = arrow::util::test_util::parquet_test_data();
250    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
251    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
252    /// // another async I/O reader such as a reader from an object store.
253    /// let file = tokio::fs::File::open(path).await.unwrap();
254    ///
255    /// // Configure options for reading from the async source
256    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
257    ///     .await
258    ///     .unwrap();
259    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
260    /// // a stream that can be used to incrementally read the data in batches
261    /// let stream = builder.build().unwrap();
262    /// // In this example, we collect the stream into a Vec<RecordBatch>
263    /// // but real applications would likely process the batches as they are read
264    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
265    /// // Demonstrate the results are as expected
266    /// assert_batches_eq(
267    ///     &results,
268    ///     &[
269    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
270    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
271    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
272    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
273    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
274    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
275    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
276    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
277    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
278    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
279    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
280    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
281    ///      ],
282    ///  );
283    /// # }
284    /// ```
285    ///
286    /// # Example configuring options and reading metadata
287    ///
288    /// There are many options that control the behavior of the reader, such as
289    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
290    ///
291    /// ```
292    /// # #[tokio::main(flavor="current_thread")]
293    /// # async fn main() {
294    /// #
295    /// # use arrow_array::RecordBatch;
296    /// # use arrow::util::pretty::pretty_format_batches;
297    /// # use futures::TryStreamExt;
298    /// #
299    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
300    /// #
301    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
302    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
303    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
304    /// #     assert_eq!(
305    /// #          &actual_lines, expected_lines,
306    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
307    /// #          expected_lines, actual_lines
308    /// #      );
309    /// #  }
310    /// #
311    /// # let testdata = arrow::util::test_util::parquet_test_data();
312    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
313    /// // As before, use tokio::fs::File to read data using an async I/O.
314    /// let file = tokio::fs::File::open(path).await.unwrap();
315    ///
316    /// // Configure options for reading from the async source, in this case we set the batch size
317    /// // to 3 which produces 3 rows at a time.
318    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
319    ///     .await
320    ///     .unwrap()
321    ///     .with_batch_size(3);
322    ///
323    /// // We can also read the metadata to inspect the schema and other metadata
324    /// // before actually reading the data
325    /// let file_metadata = builder.metadata().file_metadata();
326    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
327    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
328    ///
329    /// let stream = builder.with_projection(mask).build().unwrap();
330    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
331    /// // Print out the results
332    /// assert_batches_eq(
333    ///     &results,
334    ///     &[
335    ///         "+----------+-------------+-----------+",
336    ///         "| bool_col | tinyint_col | float_col |",
337    ///         "+----------+-------------+-----------+",
338    ///         "| true     | 0           | 0.0       |",
339    ///         "| false    | 1           | 1.1       |",
340    ///         "| true     | 0           | 0.0       |",
341    ///         "| false    | 1           | 1.1       |",
342    ///         "| true     | 0           | 0.0       |",
343    ///         "| false    | 1           | 1.1       |",
344    ///         "| true     | 0           | 0.0       |",
345    ///         "| false    | 1           | 1.1       |",
346    ///         "+----------+-------------+-----------+",
347    ///      ],
348    ///  );
349    ///
350    /// // The results has 8 rows, so since we set the batch size to 3, we expect
351    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
352    /// assert_eq!(results.len(), 3);
353    /// # }
354    /// ```
355    pub async fn new(input: T) -> Result<Self> {
356        Self::new_with_options(input, Default::default()).await
357    }
358
359    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
360    /// and [`ArrowReaderOptions`].
361    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
362        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
363        Ok(Self::new_with_metadata(input, metadata))
364    }
365
366    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
367    ///
368    /// This allows loading metadata once and using it to create multiple builders with
369    /// potentially different settings, that can be read in parallel.
370    ///
371    /// # Example of reading from multiple streams in parallel
372    ///
373    /// ```
374    /// # use std::fs::metadata;
375    /// # use std::sync::Arc;
376    /// # use bytes::Bytes;
377    /// # use arrow_array::{Int32Array, RecordBatch};
378    /// # use arrow_schema::{DataType, Field, Schema};
379    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
380    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
381    /// # use tempfile::tempfile;
382    /// # use futures::StreamExt;
383    /// # #[tokio::main(flavor="current_thread")]
384    /// # async fn main() {
385    /// #
386    /// # let mut file = tempfile().unwrap();
387    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
388    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
389    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
390    /// # writer.write(&batch).unwrap();
391    /// # writer.close().unwrap();
392    /// // open file with parquet data
393    /// let mut file = tokio::fs::File::from_std(file);
394    /// // load metadata once
395    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
396    /// // create two readers, a and b, from the same underlying file
397    /// // without reading the metadata again
398    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
399    ///     file.try_clone().await.unwrap(),
400    ///     meta.clone()
401    /// ).build().unwrap();
402    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
403    ///
404    /// // Can read batches from both readers in parallel
405    /// assert_eq!(
406    ///   a.next().await.unwrap().unwrap(),
407    ///   b.next().await.unwrap().unwrap(),
408    /// );
409    /// # }
410    /// ```
411    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
412        Self::new_builder(AsyncReader(input), metadata)
413    }
414
415    /// Read bloom filter for a column in a row group
416    ///
417    /// Returns `None` if the column does not have a bloom filter
418    ///
419    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
420    pub async fn get_row_group_column_bloom_filter(
421        &mut self,
422        row_group_idx: usize,
423        column_idx: usize,
424    ) -> Result<Option<Sbbf>> {
425        let metadata = self.metadata.row_group(row_group_idx);
426        let column_metadata = metadata.column(column_idx);
427
428        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
429            offset
430                .try_into()
431                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
432        } else {
433            return Ok(None);
434        };
435
436        let buffer = match column_metadata.bloom_filter_length() {
437            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
438            None => self
439                .input
440                .0
441                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
442        }
443        .await?;
444
445        let (header, bitset_offset) =
446            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
447
448        match header.algorithm {
449            BloomFilterAlgorithm::BLOCK => {
450                // this match exists to future proof the singleton algorithm enum
451            }
452        }
453        match header.compression {
454            BloomFilterCompression::UNCOMPRESSED => {
455                // this match exists to future proof the singleton compression enum
456            }
457        }
458        match header.hash {
459            BloomFilterHash::XXHASH => {
460                // this match exists to future proof the singleton hash enum
461            }
462        }
463
464        let bitset = match column_metadata.bloom_filter_length() {
465            Some(_) => buffer.slice(
466                (TryInto::<usize>::try_into(bitset_offset).unwrap()
467                    - TryInto::<usize>::try_into(offset).unwrap())..,
468            ),
469            None => {
470                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
471                    ParquetError::General("Bloom filter length is invalid".to_string())
472                })?;
473                self.input
474                    .0
475                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
476                    .await?
477            }
478        };
479        Ok(Some(Sbbf::new(&bitset)))
480    }
481
482    /// Build a new [`ParquetRecordBatchStream`]
483    ///
484    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
485    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
486        let num_row_groups = self.metadata.row_groups().len();
487
488        let row_groups = match self.row_groups {
489            Some(row_groups) => {
490                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
491                    return Err(general_err!(
492                        "row group {} out of bounds 0..{}",
493                        col,
494                        num_row_groups
495                    ));
496                }
497                row_groups.into()
498            }
499            None => (0..self.metadata.row_groups().len()).collect(),
500        };
501
502        // Try to avoid allocate large buffer
503        let batch_size = self
504            .batch_size
505            .min(self.metadata.file_metadata().num_rows() as usize);
506        let reader_factory = ReaderFactory {
507            input: self.input.0,
508            filter: self.filter,
509            metadata: self.metadata.clone(),
510            fields: self.fields,
511            limit: self.limit,
512            offset: self.offset,
513            metrics: self.metrics,
514            max_predicate_cache_size: self.max_predicate_cache_size,
515        };
516
517        // Ensure schema of ParquetRecordBatchStream respects projection, and does
518        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
519        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
520            Some(DataType::Struct(fields)) => {
521                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
522            }
523            None => Fields::empty(),
524            _ => unreachable!("Must be Struct for root type"),
525        };
526        let schema = Arc::new(Schema::new(projected_fields));
527
528        Ok(ParquetRecordBatchStream {
529            metadata: self.metadata,
530            batch_size,
531            row_groups,
532            projection: self.projection,
533            selection: self.selection,
534            schema,
535            reader_factory: Some(reader_factory),
536            state: StreamState::Init,
537        })
538    }
539}
540
541/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group
542///
543/// Note: If all rows are filtered out in the row group (e.g by filters, limit or
544/// offset), returns `None` for the reader.
545type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
546
547/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
548/// [`ParquetRecordBatchReader`]
549struct ReaderFactory<T> {
550    metadata: Arc<ParquetMetaData>,
551
552    /// Top level parquet schema
553    fields: Option<Arc<ParquetField>>,
554
555    input: T,
556
557    /// Optional filter
558    filter: Option<RowFilter>,
559
560    /// Limit to apply to remaining row groups.  
561    limit: Option<usize>,
562
563    /// Offset to apply to the next
564    offset: Option<usize>,
565
566    /// Metrics
567    metrics: ArrowReaderMetrics,
568
569    /// Maximum size of the predicate cache
570    ///
571    /// See [`RowGroupCache`] for details.
572    max_predicate_cache_size: usize,
573}
574
575impl<T> ReaderFactory<T>
576where
577    T: AsyncFileReader + Send,
578{
579    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
580    ///
581    /// Updates the `limit` and `offset` of the reader factory
582    ///
583    /// Note: this captures self so that the resulting future has a static lifetime
584    async fn read_row_group(
585        mut self,
586        row_group_idx: usize,
587        selection: Option<RowSelection>,
588        projection: ProjectionMask,
589        batch_size: usize,
590    ) -> ReadResult<T> {
591        // TODO: calling build_array multiple times is wasteful
592
593        let meta = self.metadata.row_group(row_group_idx);
594        let offset_index = self
595            .metadata
596            .offset_index()
597            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
598            .filter(|index| !index.is_empty())
599            .map(|x| x[row_group_idx].as_slice());
600
601        // Reuse columns that are selected and used by the filters
602        let cache_projection = match self.compute_cache_projection(&projection) {
603            Some(projection) => projection,
604            None => ProjectionMask::none(meta.columns().len()),
605        };
606        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
607            batch_size,
608            self.max_predicate_cache_size,
609        )));
610
611        let mut row_group = InMemoryRowGroup {
612            // schema: meta.schema_descr_ptr(),
613            row_count: meta.num_rows() as usize,
614            column_chunks: vec![None; meta.columns().len()],
615            offset_index,
616            row_group_idx,
617            metadata: self.metadata.as_ref(),
618        };
619
620        let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
621
622        let filter = self.filter.as_mut();
623        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
624
625        // Update selection based on any filters
626        if let Some(filter) = filter {
627            let cache_options = cache_options_builder.clone().producer();
628
629            for predicate in filter.predicates.iter_mut() {
630                if !plan_builder.selects_any() {
631                    return Ok((self, None)); // ruled out entire row group
632                }
633
634                // (pre) Fetch only the columns that are selected by the predicate
635                let selection = plan_builder.selection();
636                // Fetch predicate columns; expand selection only for cached predicate columns
637                let cache_mask = Some(&cache_projection);
638                row_group
639                    .fetch(
640                        &mut self.input,
641                        predicate.projection(),
642                        selection,
643                        batch_size,
644                        cache_mask,
645                    )
646                    .await?;
647
648                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
649                    .with_cache_options(Some(&cache_options))
650                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
651
652                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
653            }
654        }
655
656        // Compute the number of rows in the selection before applying limit and offset
657        let rows_before = plan_builder
658            .num_rows_selected()
659            .unwrap_or(row_group.row_count);
660
661        if rows_before == 0 {
662            return Ok((self, None)); // ruled out entire row group
663        }
664
665        // Apply any limit and offset
666        let plan_builder = plan_builder
667            .limited(row_group.row_count)
668            .with_offset(self.offset)
669            .with_limit(self.limit)
670            .build_limited();
671
672        let rows_after = plan_builder
673            .num_rows_selected()
674            .unwrap_or(row_group.row_count);
675
676        // Update running offset and limit for after the current row group is read
677        if let Some(offset) = &mut self.offset {
678            // Reduction is either because of offset or limit, as limit is applied
679            // after offset has been "exhausted" can just use saturating sub here
680            *offset = offset.saturating_sub(rows_before - rows_after)
681        }
682
683        if rows_after == 0 {
684            return Ok((self, None)); // ruled out entire row group
685        }
686
687        if let Some(limit) = &mut self.limit {
688            *limit -= rows_after;
689        }
690        // fetch the pages needed for decoding
691        row_group
692            // Final projection fetch shouldn't expand selection for cache; pass None
693            .fetch(
694                &mut self.input,
695                &projection,
696                plan_builder.selection(),
697                batch_size,
698                None,
699            )
700            .await?;
701
702        let plan = plan_builder.build();
703
704        let cache_options = cache_options_builder.consumer();
705        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
706            .with_cache_options(Some(&cache_options))
707            .build_array_reader(self.fields.as_deref(), &projection)?;
708
709        let reader = ParquetRecordBatchReader::new(array_reader, plan);
710
711        Ok((self, Some(reader)))
712    }
713
714    /// Compute which columns are used in filters and the final (output) projection
715    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
716        // Do not compute the projection mask if the predicate cache is disabled
717        if self.max_predicate_cache_size == 0 {
718            return None;
719        }
720
721        let filters = self.filter.as_ref()?;
722        let mut cache_projection = filters.predicates.first()?.projection().clone();
723        for predicate in filters.predicates.iter() {
724            cache_projection.union(predicate.projection());
725        }
726        cache_projection.intersect(projection);
727        self.exclude_nested_columns_from_cache(&cache_projection)
728    }
729
730    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
731    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
732        let schema = self.metadata.file_metadata().schema_descr();
733        let num_leaves = schema.num_columns();
734
735        // Count how many leaves each root column has
736        let num_roots = schema.root_schema().get_fields().len();
737        let mut root_leaf_counts = vec![0usize; num_roots];
738        for leaf_idx in 0..num_leaves {
739            let root_idx = schema.get_column_root_idx(leaf_idx);
740            root_leaf_counts[root_idx] += 1;
741        }
742
743        // Keep only leaves whose root has exactly one leaf (non-nested)
744        let mut included_leaves = Vec::new();
745        for leaf_idx in 0..num_leaves {
746            if mask.leaf_included(leaf_idx) {
747                let root_idx = schema.get_column_root_idx(leaf_idx);
748                if root_leaf_counts[root_idx] == 1 {
749                    included_leaves.push(leaf_idx);
750                }
751            }
752        }
753
754        if included_leaves.is_empty() {
755            None
756        } else {
757            Some(ProjectionMask::leaves(schema, included_leaves))
758        }
759    }
760}
761
762enum StreamState<T> {
763    /// At the start of a new row group, or the end of the parquet stream
764    Init,
765    /// Decoding a batch
766    Decoding(ParquetRecordBatchReader),
767    /// Reading data from input
768    Reading(BoxFuture<'static, ReadResult<T>>),
769    /// Error
770    Error,
771}
772
773impl<T> std::fmt::Debug for StreamState<T> {
774    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
775        match self {
776            StreamState::Init => write!(f, "StreamState::Init"),
777            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
778            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
779            StreamState::Error => write!(f, "StreamState::Error"),
780        }
781    }
782}
783
784/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
785///
786/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
787/// allowing users to decode record batches separately from I/O.
788///
789/// # I/O Buffering
790///
791/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
792/// (projection + filtering, etc) and decodes the rows from those buffered pages.
793///
794/// For example, if all rows and columns are selected, the entire row group is
795/// buffered in memory during decode. This minimizes the number of IO operations
796/// required, which is especially important for object stores, where IO operations
797/// have latencies in the hundreds of milliseconds
798///
799///
800/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
801pub struct ParquetRecordBatchStream<T> {
802    metadata: Arc<ParquetMetaData>,
803
804    schema: SchemaRef,
805
806    row_groups: VecDeque<usize>,
807
808    projection: ProjectionMask,
809
810    batch_size: usize,
811
812    selection: Option<RowSelection>,
813
814    /// This is an option so it can be moved into a future
815    reader_factory: Option<ReaderFactory<T>>,
816
817    state: StreamState<T>,
818}
819
820impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
821    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("ParquetRecordBatchStream")
823            .field("metadata", &self.metadata)
824            .field("schema", &self.schema)
825            .field("batch_size", &self.batch_size)
826            .field("projection", &self.projection)
827            .field("state", &self.state)
828            .finish()
829    }
830}
831
832impl<T> ParquetRecordBatchStream<T> {
833    /// Returns the projected [`SchemaRef`] for reading the parquet file.
834    ///
835    /// Note that the schema metadata will be stripped here. See
836    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
837    pub fn schema(&self) -> &SchemaRef {
838        &self.schema
839    }
840}
841
842impl<T> ParquetRecordBatchStream<T>
843where
844    T: AsyncFileReader + Unpin + Send + 'static,
845{
846    /// Fetches the next row group from the stream.
847    ///
848    /// Users can continue to call this function to get row groups and decode them concurrently.
849    ///
850    /// ## Notes
851    ///
852    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
853    ///
854    /// ## Returns
855    ///
856    /// - `Ok(None)` if the stream has ended.
857    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
858    /// - `Ok(Some(reader))` which holds all the data for the row group.
859    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
860        loop {
861            match &mut self.state {
862                StreamState::Decoding(_) | StreamState::Reading(_) => {
863                    return Err(ParquetError::General(
864                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
865                    ));
866                }
867                StreamState::Init => {
868                    let row_group_idx = match self.row_groups.pop_front() {
869                        Some(idx) => idx,
870                        None => return Ok(None),
871                    };
872
873                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
874
875                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
876
877                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
878
879                    let (reader_factory, maybe_reader) = reader_factory
880                        .read_row_group(
881                            row_group_idx,
882                            selection,
883                            self.projection.clone(),
884                            self.batch_size,
885                        )
886                        .await
887                        .inspect_err(|_| {
888                            self.state = StreamState::Error;
889                        })?;
890                    self.reader_factory = Some(reader_factory);
891
892                    if let Some(reader) = maybe_reader {
893                        return Ok(Some(reader));
894                    } else {
895                        // All rows skipped, read next row group
896                        continue;
897                    }
898                }
899                StreamState::Error => return Ok(None), // Ends the stream as error happens.
900            }
901        }
902    }
903}
904
905impl<T> Stream for ParquetRecordBatchStream<T>
906where
907    T: AsyncFileReader + Unpin + Send + 'static,
908{
909    type Item = Result<RecordBatch>;
910
911    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
912        loop {
913            match &mut self.state {
914                StreamState::Decoding(batch_reader) => match batch_reader.next() {
915                    Some(Ok(batch)) => {
916                        return Poll::Ready(Some(Ok(batch)));
917                    }
918                    Some(Err(e)) => {
919                        self.state = StreamState::Error;
920                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
921                    }
922                    None => self.state = StreamState::Init,
923                },
924                StreamState::Init => {
925                    let row_group_idx = match self.row_groups.pop_front() {
926                        Some(idx) => idx,
927                        None => return Poll::Ready(None),
928                    };
929
930                    let reader = self.reader_factory.take().expect("lost reader factory");
931
932                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
933
934                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
935
936                    let fut = reader
937                        .read_row_group(
938                            row_group_idx,
939                            selection,
940                            self.projection.clone(),
941                            self.batch_size,
942                        )
943                        .boxed();
944
945                    self.state = StreamState::Reading(fut)
946                }
947                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
948                    Ok((reader_factory, maybe_reader)) => {
949                        self.reader_factory = Some(reader_factory);
950                        match maybe_reader {
951                            // Read records from [`ParquetRecordBatchReader`]
952                            Some(reader) => self.state = StreamState::Decoding(reader),
953                            // All rows skipped, read next row group
954                            None => self.state = StreamState::Init,
955                        }
956                    }
957                    Err(e) => {
958                        self.state = StreamState::Error;
959                        return Poll::Ready(Some(Err(e)));
960                    }
961                },
962                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
963            }
964        }
965    }
966}
967
968// Note this implementation is not with the rest of the InMemoryRowGroup
969// implementation because it relies on several async traits and types
970// that are only available when the "async" feature is enabled.
971impl InMemoryRowGroup<'_> {
972    /// Fetches any additional column data specified in `projection` that is not already
973    /// present in `self.column_chunks`.
974    ///
975    /// If `selection` is provided, only the pages required for the selection
976    /// are fetched. Otherwise, all pages are fetched.
977    pub(crate) async fn fetch<T: AsyncFileReader + Send>(
978        &mut self,
979        input: &mut T,
980        projection: &ProjectionMask,
981        selection: Option<&RowSelection>,
982        batch_size: usize,
983        cache_mask: Option<&ProjectionMask>,
984    ) -> Result<()> {
985        // Figure out what ranges to fetch
986        let FetchRanges {
987            ranges,
988            page_start_offsets,
989        } = self.fetch_ranges(projection, selection, batch_size, cache_mask);
990        // do the actual fetch
991        let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
992        // update our in memory buffers (self.column_chunks) with the fetched data
993        self.fill_column_chunks(projection, page_start_offsets, chunk_data);
994        Ok(())
995    }
996}
997#[cfg(test)]
998mod tests {
999    use super::*;
1000    use crate::arrow::ArrowWriter;
1001    use crate::arrow::arrow_reader::{
1002        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1003    };
1004    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1005    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1006    use crate::file::metadata::ParquetMetaDataReader;
1007    use crate::file::properties::WriterProperties;
1008    use arrow::compute::kernels::cmp::eq;
1009    use arrow::error::Result as ArrowResult;
1010    use arrow_array::builder::{ListBuilder, StringBuilder};
1011    use arrow_array::cast::AsArray;
1012    use arrow_array::types::Int32Type;
1013    use arrow_array::{
1014        Array, ArrayRef, Int8Array, Int32Array, RecordBatchReader, Scalar, StringArray,
1015        StructArray, UInt64Array,
1016    };
1017    use arrow_schema::{DataType, Field, Schema};
1018    use futures::{StreamExt, TryStreamExt};
1019    use rand::{Rng, rng};
1020    use std::collections::HashMap;
1021    use std::sync::{Arc, Mutex};
1022    use tempfile::tempfile;
1023
1024    #[derive(Clone)]
1025    struct TestReader {
1026        data: Bytes,
1027        metadata: Option<Arc<ParquetMetaData>>,
1028        requests: Arc<Mutex<Vec<Range<usize>>>>,
1029    }
1030
1031    impl TestReader {
1032        fn new(data: Bytes) -> Self {
1033            Self {
1034                data,
1035                metadata: Default::default(),
1036                requests: Default::default(),
1037            }
1038        }
1039    }
1040
1041    impl AsyncFileReader for TestReader {
1042        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1043            let range = range.clone();
1044            self.requests
1045                .lock()
1046                .unwrap()
1047                .push(range.start as usize..range.end as usize);
1048            futures::future::ready(Ok(self
1049                .data
1050                .slice(range.start as usize..range.end as usize)))
1051            .boxed()
1052        }
1053
1054        fn get_metadata<'a>(
1055            &'a mut self,
1056            options: Option<&'a ArrowReaderOptions>,
1057        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1058            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1059                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1060            );
1061            self.metadata = Some(Arc::new(
1062                metadata_reader.parse_and_finish(&self.data).unwrap(),
1063            ));
1064            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1065        }
1066    }
1067
1068    #[tokio::test]
1069    async fn test_async_reader() {
1070        let testdata = arrow::util::test_util::parquet_test_data();
1071        let path = format!("{testdata}/alltypes_plain.parquet");
1072        let data = Bytes::from(std::fs::read(path).unwrap());
1073
1074        let async_reader = TestReader::new(data.clone());
1075
1076        let requests = async_reader.requests.clone();
1077        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1078            .await
1079            .unwrap();
1080
1081        let metadata = builder.metadata().clone();
1082        assert_eq!(metadata.num_row_groups(), 1);
1083
1084        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1085        let stream = builder
1086            .with_projection(mask.clone())
1087            .with_batch_size(1024)
1088            .build()
1089            .unwrap();
1090
1091        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1092
1093        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1094            .unwrap()
1095            .with_projection(mask)
1096            .with_batch_size(104)
1097            .build()
1098            .unwrap()
1099            .collect::<ArrowResult<Vec<_>>>()
1100            .unwrap();
1101
1102        assert_eq!(async_batches, sync_batches);
1103
1104        let requests = requests.lock().unwrap();
1105        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1106        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1107
1108        assert_eq!(
1109            &requests[..],
1110            &[
1111                offset_1 as usize..(offset_1 + length_1) as usize,
1112                offset_2 as usize..(offset_2 + length_2) as usize
1113            ]
1114        );
1115    }
1116
1117    #[tokio::test]
1118    async fn test_async_reader_with_next_row_group() {
1119        let testdata = arrow::util::test_util::parquet_test_data();
1120        let path = format!("{testdata}/alltypes_plain.parquet");
1121        let data = Bytes::from(std::fs::read(path).unwrap());
1122
1123        let async_reader = TestReader::new(data.clone());
1124
1125        let requests = async_reader.requests.clone();
1126        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1127            .await
1128            .unwrap();
1129
1130        let metadata = builder.metadata().clone();
1131        assert_eq!(metadata.num_row_groups(), 1);
1132
1133        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1134        let mut stream = builder
1135            .with_projection(mask.clone())
1136            .with_batch_size(1024)
1137            .build()
1138            .unwrap();
1139
1140        let mut readers = vec![];
1141        while let Some(reader) = stream.next_row_group().await.unwrap() {
1142            readers.push(reader);
1143        }
1144
1145        let async_batches: Vec<_> = readers
1146            .into_iter()
1147            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1148            .collect();
1149
1150        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1151            .unwrap()
1152            .with_projection(mask)
1153            .with_batch_size(104)
1154            .build()
1155            .unwrap()
1156            .collect::<ArrowResult<Vec<_>>>()
1157            .unwrap();
1158
1159        assert_eq!(async_batches, sync_batches);
1160
1161        let requests = requests.lock().unwrap();
1162        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1163        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1164
1165        assert_eq!(
1166            &requests[..],
1167            &[
1168                offset_1 as usize..(offset_1 + length_1) as usize,
1169                offset_2 as usize..(offset_2 + length_2) as usize
1170            ]
1171        );
1172    }
1173
1174    #[tokio::test]
1175    async fn test_async_reader_with_index() {
1176        let testdata = arrow::util::test_util::parquet_test_data();
1177        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1178        let data = Bytes::from(std::fs::read(path).unwrap());
1179
1180        let async_reader = TestReader::new(data.clone());
1181
1182        let options = ArrowReaderOptions::new().with_page_index(true);
1183        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1184            .await
1185            .unwrap();
1186
1187        // The builder should have page and offset indexes loaded now
1188        let metadata_with_index = builder.metadata();
1189        assert_eq!(metadata_with_index.num_row_groups(), 1);
1190
1191        // Check offset indexes are present for all columns
1192        let offset_index = metadata_with_index.offset_index().unwrap();
1193        let column_index = metadata_with_index.column_index().unwrap();
1194
1195        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1196        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1197
1198        let num_columns = metadata_with_index
1199            .file_metadata()
1200            .schema_descr()
1201            .num_columns();
1202
1203        // Check page indexes are present for all columns
1204        offset_index
1205            .iter()
1206            .for_each(|x| assert_eq!(x.len(), num_columns));
1207        column_index
1208            .iter()
1209            .for_each(|x| assert_eq!(x.len(), num_columns));
1210
1211        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1212        let stream = builder
1213            .with_projection(mask.clone())
1214            .with_batch_size(1024)
1215            .build()
1216            .unwrap();
1217
1218        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1219
1220        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1221            .unwrap()
1222            .with_projection(mask)
1223            .with_batch_size(1024)
1224            .build()
1225            .unwrap()
1226            .collect::<ArrowResult<Vec<_>>>()
1227            .unwrap();
1228
1229        assert_eq!(async_batches, sync_batches);
1230    }
1231
1232    #[tokio::test]
1233    async fn test_async_reader_with_limit() {
1234        let testdata = arrow::util::test_util::parquet_test_data();
1235        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1236        let data = Bytes::from(std::fs::read(path).unwrap());
1237
1238        let metadata = ParquetMetaDataReader::new()
1239            .parse_and_finish(&data)
1240            .unwrap();
1241        let metadata = Arc::new(metadata);
1242
1243        assert_eq!(metadata.num_row_groups(), 1);
1244
1245        let async_reader = TestReader::new(data.clone());
1246
1247        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1248            .await
1249            .unwrap();
1250
1251        assert_eq!(builder.metadata().num_row_groups(), 1);
1252
1253        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1254        let stream = builder
1255            .with_projection(mask.clone())
1256            .with_batch_size(1024)
1257            .with_limit(1)
1258            .build()
1259            .unwrap();
1260
1261        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1262
1263        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1264            .unwrap()
1265            .with_projection(mask)
1266            .with_batch_size(1024)
1267            .with_limit(1)
1268            .build()
1269            .unwrap()
1270            .collect::<ArrowResult<Vec<_>>>()
1271            .unwrap();
1272
1273        assert_eq!(async_batches, sync_batches);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_async_reader_skip_pages() {
1278        let testdata = arrow::util::test_util::parquet_test_data();
1279        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1280        let data = Bytes::from(std::fs::read(path).unwrap());
1281
1282        let async_reader = TestReader::new(data.clone());
1283
1284        let options = ArrowReaderOptions::new().with_page_index(true);
1285        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1286            .await
1287            .unwrap();
1288
1289        assert_eq!(builder.metadata().num_row_groups(), 1);
1290
1291        let selection = RowSelection::from(vec![
1292            RowSelector::skip(21),   // Skip first page
1293            RowSelector::select(21), // Select page to boundary
1294            RowSelector::skip(41),   // Skip multiple pages
1295            RowSelector::select(41), // Select multiple pages
1296            RowSelector::skip(25),   // Skip page across boundary
1297            RowSelector::select(25), // Select across page boundary
1298            RowSelector::skip(7116), // Skip to final page boundary
1299            RowSelector::select(10), // Select final page
1300        ]);
1301
1302        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1303
1304        let stream = builder
1305            .with_projection(mask.clone())
1306            .with_row_selection(selection.clone())
1307            .build()
1308            .expect("building stream");
1309
1310        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1311
1312        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1313            .unwrap()
1314            .with_projection(mask)
1315            .with_batch_size(1024)
1316            .with_row_selection(selection)
1317            .build()
1318            .unwrap()
1319            .collect::<ArrowResult<Vec<_>>>()
1320            .unwrap();
1321
1322        assert_eq!(async_batches, sync_batches);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_fuzz_async_reader_selection() {
1327        let testdata = arrow::util::test_util::parquet_test_data();
1328        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1329        let data = Bytes::from(std::fs::read(path).unwrap());
1330
1331        let mut rand = rng();
1332
1333        for _ in 0..100 {
1334            let mut expected_rows = 0;
1335            let mut total_rows = 0;
1336            let mut skip = false;
1337            let mut selectors = vec![];
1338
1339            while total_rows < 7300 {
1340                let row_count: usize = rand.random_range(1..100);
1341
1342                let row_count = row_count.min(7300 - total_rows);
1343
1344                selectors.push(RowSelector { row_count, skip });
1345
1346                total_rows += row_count;
1347                if !skip {
1348                    expected_rows += row_count;
1349                }
1350
1351                skip = !skip;
1352            }
1353
1354            let selection = RowSelection::from(selectors);
1355
1356            let async_reader = TestReader::new(data.clone());
1357
1358            let options = ArrowReaderOptions::new().with_page_index(true);
1359            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1360                .await
1361                .unwrap();
1362
1363            assert_eq!(builder.metadata().num_row_groups(), 1);
1364
1365            let col_idx: usize = rand.random_range(0..13);
1366            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1367
1368            let stream = builder
1369                .with_projection(mask.clone())
1370                .with_row_selection(selection.clone())
1371                .build()
1372                .expect("building stream");
1373
1374            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1375
1376            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1377
1378            assert_eq!(actual_rows, expected_rows);
1379        }
1380    }
1381
1382    #[tokio::test]
1383    async fn test_async_reader_zero_row_selector() {
1384        //See https://github.com/apache/arrow-rs/issues/2669
1385        let testdata = arrow::util::test_util::parquet_test_data();
1386        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1387        let data = Bytes::from(std::fs::read(path).unwrap());
1388
1389        let mut rand = rng();
1390
1391        let mut expected_rows = 0;
1392        let mut total_rows = 0;
1393        let mut skip = false;
1394        let mut selectors = vec![];
1395
1396        selectors.push(RowSelector {
1397            row_count: 0,
1398            skip: false,
1399        });
1400
1401        while total_rows < 7300 {
1402            let row_count: usize = rand.random_range(1..100);
1403
1404            let row_count = row_count.min(7300 - total_rows);
1405
1406            selectors.push(RowSelector { row_count, skip });
1407
1408            total_rows += row_count;
1409            if !skip {
1410                expected_rows += row_count;
1411            }
1412
1413            skip = !skip;
1414        }
1415
1416        let selection = RowSelection::from(selectors);
1417
1418        let async_reader = TestReader::new(data.clone());
1419
1420        let options = ArrowReaderOptions::new().with_page_index(true);
1421        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1422            .await
1423            .unwrap();
1424
1425        assert_eq!(builder.metadata().num_row_groups(), 1);
1426
1427        let col_idx: usize = rand.random_range(0..13);
1428        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1429
1430        let stream = builder
1431            .with_projection(mask.clone())
1432            .with_row_selection(selection.clone())
1433            .build()
1434            .expect("building stream");
1435
1436        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1437
1438        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1439
1440        assert_eq!(actual_rows, expected_rows);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_row_filter() {
1445        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1446        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1447        let data = RecordBatch::try_from_iter([
1448            ("a", Arc::new(a) as ArrayRef),
1449            ("b", Arc::new(b) as ArrayRef),
1450        ])
1451        .unwrap();
1452
1453        let mut buf = Vec::with_capacity(1024);
1454        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1455        writer.write(&data).unwrap();
1456        writer.close().unwrap();
1457
1458        let data: Bytes = buf.into();
1459        let metadata = ParquetMetaDataReader::new()
1460            .parse_and_finish(&data)
1461            .unwrap();
1462        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1463
1464        let test = TestReader::new(data);
1465        let requests = test.requests.clone();
1466
1467        let a_scalar = StringArray::from_iter_values(["b"]);
1468        let a_filter = ArrowPredicateFn::new(
1469            ProjectionMask::leaves(&parquet_schema, vec![0]),
1470            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1471        );
1472
1473        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1474
1475        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1476        let stream = ParquetRecordBatchStreamBuilder::new(test)
1477            .await
1478            .unwrap()
1479            .with_projection(mask.clone())
1480            .with_batch_size(1024)
1481            .with_row_filter(filter)
1482            .build()
1483            .unwrap();
1484
1485        let batches: Vec<_> = stream.try_collect().await.unwrap();
1486        assert_eq!(batches.len(), 1);
1487
1488        let batch = &batches[0];
1489        assert_eq!(batch.num_columns(), 2);
1490
1491        // Filter should have kept only rows with "b" in column 0
1492        assert_eq!(
1493            batch.column(0).as_ref(),
1494            &StringArray::from_iter_values(["b", "b", "b"])
1495        );
1496        assert_eq!(
1497            batch.column(1).as_ref(),
1498            &StringArray::from_iter_values(["2", "3", "4"])
1499        );
1500
1501        // Should only have made 2 requests:
1502        // * First request fetches data for evaluating the predicate
1503        // * Second request fetches data for evaluating the projection
1504        assert_eq!(requests.lock().unwrap().len(), 2);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_two_row_filters() {
1509        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1510        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1511        let c = Int32Array::from_iter(0..6);
1512        let data = RecordBatch::try_from_iter([
1513            ("a", Arc::new(a) as ArrayRef),
1514            ("b", Arc::new(b) as ArrayRef),
1515            ("c", Arc::new(c) as ArrayRef),
1516        ])
1517        .unwrap();
1518
1519        let mut buf = Vec::with_capacity(1024);
1520        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1521        writer.write(&data).unwrap();
1522        writer.close().unwrap();
1523
1524        let data: Bytes = buf.into();
1525        let metadata = ParquetMetaDataReader::new()
1526            .parse_and_finish(&data)
1527            .unwrap();
1528        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1529
1530        let test = TestReader::new(data);
1531        let requests = test.requests.clone();
1532
1533        let a_scalar = StringArray::from_iter_values(["b"]);
1534        let a_filter = ArrowPredicateFn::new(
1535            ProjectionMask::leaves(&parquet_schema, vec![0]),
1536            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1537        );
1538
1539        let b_scalar = StringArray::from_iter_values(["4"]);
1540        let b_filter = ArrowPredicateFn::new(
1541            ProjectionMask::leaves(&parquet_schema, vec![1]),
1542            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1543        );
1544
1545        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1546
1547        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1548        let stream = ParquetRecordBatchStreamBuilder::new(test)
1549            .await
1550            .unwrap()
1551            .with_projection(mask.clone())
1552            .with_batch_size(1024)
1553            .with_row_filter(filter)
1554            .build()
1555            .unwrap();
1556
1557        let batches: Vec<_> = stream.try_collect().await.unwrap();
1558        assert_eq!(batches.len(), 1);
1559
1560        let batch = &batches[0];
1561        assert_eq!(batch.num_rows(), 1);
1562        assert_eq!(batch.num_columns(), 2);
1563
1564        let col = batch.column(0);
1565        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1566        assert_eq!(val, "b");
1567
1568        let col = batch.column(1);
1569        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1570        assert_eq!(val, 3);
1571
1572        // Should only have made 3 requests
1573        // * First request fetches data for evaluating the first predicate
1574        // * Second request fetches data for evaluating the second predicate
1575        // * Third request fetches data for evaluating the projection
1576        assert_eq!(requests.lock().unwrap().len(), 3);
1577    }
1578
1579    #[tokio::test]
1580    async fn test_limit_multiple_row_groups() {
1581        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1582        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1583        let c = Int32Array::from_iter(0..6);
1584        let data = RecordBatch::try_from_iter([
1585            ("a", Arc::new(a) as ArrayRef),
1586            ("b", Arc::new(b) as ArrayRef),
1587            ("c", Arc::new(c) as ArrayRef),
1588        ])
1589        .unwrap();
1590
1591        let mut buf = Vec::with_capacity(1024);
1592        let props = WriterProperties::builder()
1593            .set_max_row_group_size(3)
1594            .build();
1595        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1596        writer.write(&data).unwrap();
1597        writer.close().unwrap();
1598
1599        let data: Bytes = buf.into();
1600        let metadata = ParquetMetaDataReader::new()
1601            .parse_and_finish(&data)
1602            .unwrap();
1603
1604        assert_eq!(metadata.num_row_groups(), 2);
1605
1606        let test = TestReader::new(data);
1607
1608        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1609            .await
1610            .unwrap()
1611            .with_batch_size(1024)
1612            .with_limit(4)
1613            .build()
1614            .unwrap();
1615
1616        let batches: Vec<_> = stream.try_collect().await.unwrap();
1617        // Expect one batch for each row group
1618        assert_eq!(batches.len(), 2);
1619
1620        let batch = &batches[0];
1621        // First batch should contain all rows
1622        assert_eq!(batch.num_rows(), 3);
1623        assert_eq!(batch.num_columns(), 3);
1624        let col2 = batch.column(2).as_primitive::<Int32Type>();
1625        assert_eq!(col2.values(), &[0, 1, 2]);
1626
1627        let batch = &batches[1];
1628        // Second batch should trigger the limit and only have one row
1629        assert_eq!(batch.num_rows(), 1);
1630        assert_eq!(batch.num_columns(), 3);
1631        let col2 = batch.column(2).as_primitive::<Int32Type>();
1632        assert_eq!(col2.values(), &[3]);
1633
1634        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1635            .await
1636            .unwrap()
1637            .with_offset(2)
1638            .with_limit(3)
1639            .build()
1640            .unwrap();
1641
1642        let batches: Vec<_> = stream.try_collect().await.unwrap();
1643        // Expect one batch for each row group
1644        assert_eq!(batches.len(), 2);
1645
1646        let batch = &batches[0];
1647        // First batch should contain one row
1648        assert_eq!(batch.num_rows(), 1);
1649        assert_eq!(batch.num_columns(), 3);
1650        let col2 = batch.column(2).as_primitive::<Int32Type>();
1651        assert_eq!(col2.values(), &[2]);
1652
1653        let batch = &batches[1];
1654        // Second batch should contain two rows
1655        assert_eq!(batch.num_rows(), 2);
1656        assert_eq!(batch.num_columns(), 3);
1657        let col2 = batch.column(2).as_primitive::<Int32Type>();
1658        assert_eq!(col2.values(), &[3, 4]);
1659
1660        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1661            .await
1662            .unwrap()
1663            .with_offset(4)
1664            .with_limit(20)
1665            .build()
1666            .unwrap();
1667
1668        let batches: Vec<_> = stream.try_collect().await.unwrap();
1669        // Should skip first row group
1670        assert_eq!(batches.len(), 1);
1671
1672        let batch = &batches[0];
1673        // First batch should contain two rows
1674        assert_eq!(batch.num_rows(), 2);
1675        assert_eq!(batch.num_columns(), 3);
1676        let col2 = batch.column(2).as_primitive::<Int32Type>();
1677        assert_eq!(col2.values(), &[4, 5]);
1678    }
1679
1680    #[tokio::test]
1681    async fn test_row_filter_with_index() {
1682        let testdata = arrow::util::test_util::parquet_test_data();
1683        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1684        let data = Bytes::from(std::fs::read(path).unwrap());
1685
1686        let metadata = ParquetMetaDataReader::new()
1687            .parse_and_finish(&data)
1688            .unwrap();
1689        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1690
1691        assert_eq!(metadata.num_row_groups(), 1);
1692
1693        let async_reader = TestReader::new(data.clone());
1694
1695        let a_filter =
1696            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1697                Ok(batch.column(0).as_boolean().clone())
1698            });
1699
1700        let b_scalar = Int8Array::from(vec![2]);
1701        let b_filter = ArrowPredicateFn::new(
1702            ProjectionMask::leaves(&parquet_schema, vec![2]),
1703            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1704        );
1705
1706        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1707
1708        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1709
1710        let options = ArrowReaderOptions::new().with_page_index(true);
1711        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1712            .await
1713            .unwrap()
1714            .with_projection(mask.clone())
1715            .with_batch_size(1024)
1716            .with_row_filter(filter)
1717            .build()
1718            .unwrap();
1719
1720        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1721
1722        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1723
1724        assert_eq!(total_rows, 730);
1725    }
1726
1727    #[tokio::test]
1728    #[allow(deprecated)]
1729    async fn test_in_memory_row_group_sparse() {
1730        let testdata = arrow::util::test_util::parquet_test_data();
1731        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1732        let data = Bytes::from(std::fs::read(path).unwrap());
1733
1734        let metadata = ParquetMetaDataReader::new()
1735            .with_page_indexes(true)
1736            .parse_and_finish(&data)
1737            .unwrap();
1738
1739        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1740
1741        let mut metadata_builder = metadata.into_builder();
1742        let mut row_groups = metadata_builder.take_row_groups();
1743        row_groups.truncate(1);
1744        let row_group_meta = row_groups.pop().unwrap();
1745
1746        let metadata = metadata_builder
1747            .add_row_group(row_group_meta)
1748            .set_column_index(None)
1749            .set_offset_index(Some(vec![offset_index.clone()]))
1750            .build();
1751
1752        let metadata = Arc::new(metadata);
1753
1754        let num_rows = metadata.row_group(0).num_rows();
1755
1756        assert_eq!(metadata.num_row_groups(), 1);
1757
1758        let async_reader = TestReader::new(data.clone());
1759
1760        let requests = async_reader.requests.clone();
1761        let (_, fields) = parquet_to_arrow_schema_and_fields(
1762            metadata.file_metadata().schema_descr(),
1763            ProjectionMask::all(),
1764            None,
1765        )
1766        .unwrap();
1767
1768        let _schema_desc = metadata.file_metadata().schema_descr();
1769
1770        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1771
1772        let reader_factory = ReaderFactory {
1773            metadata,
1774            fields: fields.map(Arc::new),
1775            input: async_reader,
1776            filter: None,
1777            limit: None,
1778            offset: None,
1779            metrics: ArrowReaderMetrics::disabled(),
1780            max_predicate_cache_size: 0,
1781        };
1782
1783        let mut skip = true;
1784        let mut pages = offset_index[0].page_locations.iter().peekable();
1785
1786        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1787        let mut selectors = vec![];
1788        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1789        while let Some(page) = pages.next() {
1790            let num_rows = if let Some(next_page) = pages.peek() {
1791                next_page.first_row_index - page.first_row_index
1792            } else {
1793                num_rows - page.first_row_index
1794            };
1795
1796            if skip {
1797                selectors.push(RowSelector::skip(num_rows as usize));
1798            } else {
1799                selectors.push(RowSelector::select(num_rows as usize));
1800                let start = page.offset as usize;
1801                let end = start + page.compressed_page_size as usize;
1802                expected_page_requests.push(start..end);
1803            }
1804            skip = !skip;
1805        }
1806
1807        let selection = RowSelection::from(selectors);
1808
1809        let (_factory, _reader) = reader_factory
1810            .read_row_group(0, Some(selection), projection.clone(), 48)
1811            .await
1812            .expect("reading row group");
1813
1814        let requests = requests.lock().unwrap();
1815
1816        assert_eq!(&requests[..], &expected_page_requests)
1817    }
1818
1819    #[tokio::test]
1820    async fn test_batch_size_overallocate() {
1821        let testdata = arrow::util::test_util::parquet_test_data();
1822        // `alltypes_plain.parquet` only have 8 rows
1823        let path = format!("{testdata}/alltypes_plain.parquet");
1824        let data = Bytes::from(std::fs::read(path).unwrap());
1825
1826        let async_reader = TestReader::new(data.clone());
1827
1828        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1829            .await
1830            .unwrap();
1831
1832        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1833
1834        let stream = builder
1835            .with_projection(ProjectionMask::all())
1836            .with_batch_size(1024)
1837            .build()
1838            .unwrap();
1839        assert_ne!(1024, file_rows);
1840        assert_eq!(stream.batch_size, file_rows);
1841    }
1842
1843    #[tokio::test]
1844    async fn test_get_row_group_column_bloom_filter_without_length() {
1845        let testdata = arrow::util::test_util::parquet_test_data();
1846        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1847        let data = Bytes::from(std::fs::read(path).unwrap());
1848        test_get_row_group_column_bloom_filter(data, false).await;
1849    }
1850
1851    #[tokio::test]
1852    async fn test_parquet_record_batch_stream_schema() {
1853        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1854            schema.flattened_fields().iter().map(|f| f.name()).collect()
1855        }
1856
1857        // ParquetRecordBatchReaderBuilder::schema differs from
1858        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1859        // schema contents (in terms of custom metadata attached to schema, and fields
1860        // returned). Test to ensure this remains consistent behaviour.
1861        //
1862        // Ensure same for asynchronous versions of the above.
1863
1864        // Prep data, for a schema with nested fields, with custom metadata
1865        let mut metadata = HashMap::with_capacity(1);
1866        metadata.insert("key".to_string(), "value".to_string());
1867
1868        let nested_struct_array = StructArray::from(vec![
1869            (
1870                Arc::new(Field::new("d", DataType::Utf8, true)),
1871                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1872            ),
1873            (
1874                Arc::new(Field::new("e", DataType::Utf8, true)),
1875                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1876            ),
1877        ]);
1878        let struct_array = StructArray::from(vec![
1879            (
1880                Arc::new(Field::new("a", DataType::Int32, true)),
1881                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1882            ),
1883            (
1884                Arc::new(Field::new("b", DataType::UInt64, true)),
1885                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1886            ),
1887            (
1888                Arc::new(Field::new(
1889                    "c",
1890                    nested_struct_array.data_type().clone(),
1891                    true,
1892                )),
1893                Arc::new(nested_struct_array) as ArrayRef,
1894            ),
1895        ]);
1896
1897        let schema =
1898            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1899        let record_batch = RecordBatch::from(struct_array)
1900            .with_schema(schema.clone())
1901            .unwrap();
1902
1903        // Write parquet with custom metadata in schema
1904        let mut file = tempfile().unwrap();
1905        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1906        writer.write(&record_batch).unwrap();
1907        writer.close().unwrap();
1908
1909        let all_fields = ["a", "b", "c", "d", "e"];
1910        // (leaf indices in mask, expected names in output schema all fields)
1911        let projections = [
1912            (vec![], vec![]),
1913            (vec![0], vec!["a"]),
1914            (vec![0, 1], vec!["a", "b"]),
1915            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1916            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1917        ];
1918
1919        // Ensure we're consistent for each of these projections
1920        for (indices, expected_projected_names) in projections {
1921            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1922                // Builder schema should preserve all fields and metadata
1923                assert_eq!(get_all_field_names(&builder), all_fields);
1924                assert_eq!(builder.metadata, metadata);
1925                // Reader & batch schema should show only projected fields, and no metadata
1926                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1927                assert_eq!(reader.metadata, HashMap::default());
1928                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1929                assert_eq!(batch.metadata, HashMap::default());
1930            };
1931
1932            let builder =
1933                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1934            let sync_builder_schema = builder.schema().clone();
1935            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1936            let mut reader = builder.with_projection(mask).build().unwrap();
1937            let sync_reader_schema = reader.schema();
1938            let batch = reader.next().unwrap().unwrap();
1939            let sync_batch_schema = batch.schema();
1940            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1941
1942            // asynchronous should be same
1943            let file = tokio::fs::File::from(file.try_clone().unwrap());
1944            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1945            let async_builder_schema = builder.schema().clone();
1946            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1947            let mut reader = builder.with_projection(mask).build().unwrap();
1948            let async_reader_schema = reader.schema().clone();
1949            let batch = reader.next().await.unwrap().unwrap();
1950            let async_batch_schema = batch.schema();
1951            assert_schemas(
1952                async_builder_schema,
1953                async_reader_schema,
1954                async_batch_schema,
1955            );
1956        }
1957    }
1958
1959    #[tokio::test]
1960    async fn test_get_row_group_column_bloom_filter_with_length() {
1961        // convert to new parquet file with bloom_filter_length
1962        let testdata = arrow::util::test_util::parquet_test_data();
1963        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1964        let data = Bytes::from(std::fs::read(path).unwrap());
1965        let async_reader = TestReader::new(data.clone());
1966        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1967            .await
1968            .unwrap();
1969        let schema = builder.schema().clone();
1970        let stream = builder.build().unwrap();
1971        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1972
1973        let mut parquet_data = Vec::new();
1974        let props = WriterProperties::builder()
1975            .set_bloom_filter_enabled(true)
1976            .build();
1977        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1978        for batch in batches {
1979            writer.write(&batch).unwrap();
1980        }
1981        writer.close().unwrap();
1982
1983        // test the new parquet file
1984        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1985    }
1986
1987    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1988        let async_reader = TestReader::new(data.clone());
1989
1990        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1991            .await
1992            .unwrap();
1993
1994        let metadata = builder.metadata();
1995        assert_eq!(metadata.num_row_groups(), 1);
1996        let row_group = metadata.row_group(0);
1997        let column = row_group.column(0);
1998        assert_eq!(column.bloom_filter_length().is_some(), with_length);
1999
2000        let sbbf = builder
2001            .get_row_group_column_bloom_filter(0, 0)
2002            .await
2003            .unwrap()
2004            .unwrap();
2005        assert!(sbbf.check(&"Hello"));
2006        assert!(!sbbf.check(&"Hello_Not_Exists"));
2007    }
2008
2009    #[tokio::test]
2010    async fn test_nested_skip() {
2011        let schema = Arc::new(Schema::new(vec![
2012            Field::new("col_1", DataType::UInt64, false),
2013            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2014        ]));
2015
2016        // Default writer properties
2017        let props = WriterProperties::builder()
2018            .set_data_page_row_count_limit(256)
2019            .set_write_batch_size(256)
2020            .set_max_row_group_size(1024);
2021
2022        // Write data
2023        let mut file = tempfile().unwrap();
2024        let mut writer =
2025            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2026
2027        let mut builder = ListBuilder::new(StringBuilder::new());
2028        for id in 0..1024 {
2029            match id % 3 {
2030                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2031                1 => builder.append_value([Some(format!("id_{id}"))]),
2032                _ => builder.append_null(),
2033            }
2034        }
2035        let refs = vec![
2036            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2037            Arc::new(builder.finish()) as ArrayRef,
2038        ];
2039
2040        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2041        writer.write(&batch).unwrap();
2042        writer.close().unwrap();
2043
2044        let selections = [
2045            RowSelection::from(vec![
2046                RowSelector::skip(313),
2047                RowSelector::select(1),
2048                RowSelector::skip(709),
2049                RowSelector::select(1),
2050            ]),
2051            RowSelection::from(vec![
2052                RowSelector::skip(255),
2053                RowSelector::select(1),
2054                RowSelector::skip(767),
2055                RowSelector::select(1),
2056            ]),
2057            RowSelection::from(vec![
2058                RowSelector::select(255),
2059                RowSelector::skip(1),
2060                RowSelector::select(767),
2061                RowSelector::skip(1),
2062            ]),
2063            RowSelection::from(vec![
2064                RowSelector::skip(254),
2065                RowSelector::select(1),
2066                RowSelector::select(1),
2067                RowSelector::skip(767),
2068                RowSelector::select(1),
2069            ]),
2070        ];
2071
2072        for selection in selections {
2073            let expected = selection.row_count();
2074            // Read data
2075            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2076                tokio::fs::File::from_std(file.try_clone().unwrap()),
2077                ArrowReaderOptions::new().with_page_index(true),
2078            )
2079            .await
2080            .unwrap();
2081
2082            reader = reader.with_row_selection(selection);
2083
2084            let mut stream = reader.build().unwrap();
2085
2086            let mut total_rows = 0;
2087            while let Some(rb) = stream.next().await {
2088                let rb = rb.unwrap();
2089                total_rows += rb.num_rows();
2090            }
2091            assert_eq!(total_rows, expected);
2092        }
2093    }
2094
2095    #[tokio::test]
2096    async fn test_row_filter_nested() {
2097        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2098        let b = StructArray::from(vec![
2099            (
2100                Arc::new(Field::new("aa", DataType::Utf8, true)),
2101                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2102            ),
2103            (
2104                Arc::new(Field::new("bb", DataType::Utf8, true)),
2105                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2106            ),
2107        ]);
2108        let c = Int32Array::from_iter(0..6);
2109        let data = RecordBatch::try_from_iter([
2110            ("a", Arc::new(a) as ArrayRef),
2111            ("b", Arc::new(b) as ArrayRef),
2112            ("c", Arc::new(c) as ArrayRef),
2113        ])
2114        .unwrap();
2115
2116        let mut buf = Vec::with_capacity(1024);
2117        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2118        writer.write(&data).unwrap();
2119        writer.close().unwrap();
2120
2121        let data: Bytes = buf.into();
2122        let metadata = ParquetMetaDataReader::new()
2123            .parse_and_finish(&data)
2124            .unwrap();
2125        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2126
2127        let test = TestReader::new(data);
2128        let requests = test.requests.clone();
2129
2130        let a_scalar = StringArray::from_iter_values(["b"]);
2131        let a_filter = ArrowPredicateFn::new(
2132            ProjectionMask::leaves(&parquet_schema, vec![0]),
2133            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2134        );
2135
2136        let b_scalar = StringArray::from_iter_values(["4"]);
2137        let b_filter = ArrowPredicateFn::new(
2138            ProjectionMask::leaves(&parquet_schema, vec![2]),
2139            move |batch| {
2140                // Filter on the second element of the struct.
2141                let struct_array = batch
2142                    .column(0)
2143                    .as_any()
2144                    .downcast_ref::<StructArray>()
2145                    .unwrap();
2146                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2147            },
2148        );
2149
2150        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2151
2152        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2153        let stream = ParquetRecordBatchStreamBuilder::new(test)
2154            .await
2155            .unwrap()
2156            .with_projection(mask.clone())
2157            .with_batch_size(1024)
2158            .with_row_filter(filter)
2159            .build()
2160            .unwrap();
2161
2162        let batches: Vec<_> = stream.try_collect().await.unwrap();
2163        assert_eq!(batches.len(), 1);
2164
2165        let batch = &batches[0];
2166        assert_eq!(batch.num_rows(), 1);
2167        assert_eq!(batch.num_columns(), 2);
2168
2169        let col = batch.column(0);
2170        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2171        assert_eq!(val, "b");
2172
2173        let col = batch.column(1);
2174        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2175        assert_eq!(val, 3);
2176
2177        // Should only have made 3 requests
2178        // * First request fetches data for evaluating the first predicate
2179        // * Second request fetches data for evaluating the second predicate
2180        // * Third request fetches data for evaluating the projection
2181        assert_eq!(requests.lock().unwrap().len(), 3);
2182    }
2183
2184    #[tokio::test]
2185    #[allow(deprecated)]
2186    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2187        use tokio::fs::File;
2188        let testdata = arrow::util::test_util::parquet_test_data();
2189        let path = format!("{testdata}/alltypes_plain.parquet");
2190        let mut file = File::open(&path).await.unwrap();
2191        let file_size = file.metadata().await.unwrap().len();
2192        let mut metadata = ParquetMetaDataReader::new()
2193            .with_page_indexes(true)
2194            .load_and_finish(&mut file, file_size)
2195            .await
2196            .unwrap();
2197
2198        metadata.set_offset_index(Some(vec![]));
2199        let options = ArrowReaderOptions::new().with_page_index(true);
2200        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2201        let reader =
2202            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2203                .build()
2204                .unwrap();
2205
2206        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2207        assert_eq!(result.len(), 1);
2208    }
2209
2210    #[tokio::test]
2211    #[allow(deprecated)]
2212    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2213        use tokio::fs::File;
2214        let testdata = arrow::util::test_util::parquet_test_data();
2215        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2216        let mut file = File::open(&path).await.unwrap();
2217        let file_size = file.metadata().await.unwrap().len();
2218        let metadata = ParquetMetaDataReader::new()
2219            .with_page_indexes(true)
2220            .load_and_finish(&mut file, file_size)
2221            .await
2222            .unwrap();
2223
2224        let options = ArrowReaderOptions::new().with_page_index(true);
2225        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2226        let reader =
2227            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2228                .build()
2229                .unwrap();
2230
2231        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2232        assert_eq!(result.len(), 8);
2233    }
2234
2235    #[tokio::test]
2236    #[allow(deprecated)]
2237    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2238        use tempfile::TempDir;
2239        use tokio::fs::File;
2240        fn write_metadata_to_local_file(
2241            metadata: ParquetMetaData,
2242            file: impl AsRef<std::path::Path>,
2243        ) {
2244            use crate::file::metadata::ParquetMetaDataWriter;
2245            use std::fs::File;
2246            let file = File::create(file).unwrap();
2247            ParquetMetaDataWriter::new(file, &metadata)
2248                .finish()
2249                .unwrap()
2250        }
2251
2252        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2253            use std::fs::File;
2254            let file = File::open(file).unwrap();
2255            ParquetMetaDataReader::new()
2256                .with_page_indexes(true)
2257                .parse_and_finish(&file)
2258                .unwrap()
2259        }
2260
2261        let testdata = arrow::util::test_util::parquet_test_data();
2262        let path = format!("{testdata}/alltypes_plain.parquet");
2263        let mut file = File::open(&path).await.unwrap();
2264        let file_size = file.metadata().await.unwrap().len();
2265        let metadata = ParquetMetaDataReader::new()
2266            .with_page_indexes(true)
2267            .load_and_finish(&mut file, file_size)
2268            .await
2269            .unwrap();
2270
2271        let tempdir = TempDir::new().unwrap();
2272        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2273        write_metadata_to_local_file(metadata, &metadata_path);
2274        let metadata = read_metadata_from_local_file(&metadata_path);
2275
2276        let options = ArrowReaderOptions::new().with_page_index(true);
2277        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2278        let reader =
2279            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2280                .build()
2281                .unwrap();
2282
2283        // Panics here
2284        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2285        assert_eq!(result.len(), 1);
2286    }
2287
2288    #[tokio::test]
2289    async fn test_cached_array_reader_sparse_offset_error() {
2290        use futures::TryStreamExt;
2291
2292        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2293        use arrow_array::{BooleanArray, RecordBatch};
2294
2295        let testdata = arrow::util::test_util::parquet_test_data();
2296        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2297        let data = Bytes::from(std::fs::read(path).unwrap());
2298
2299        let async_reader = TestReader::new(data);
2300
2301        // Enable page index so the fetch logic loads only required pages
2302        let options = ArrowReaderOptions::new().with_page_index(true);
2303        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2304            .await
2305            .unwrap();
2306
2307        // Skip the first 22 rows (entire first Parquet page) and then select the
2308        // next 3 rows (22, 23, 24). This means the fetch step will not include
2309        // the first page starting at file offset 0.
2310        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2311
2312        // Trivial predicate on column 0 that always returns `true`. Using the
2313        // same column in both predicate and projection activates the caching
2314        // layer (Producer/Consumer pattern).
2315        let parquet_schema = builder.parquet_schema();
2316        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2317        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2318            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2319        });
2320        let filter = RowFilter::new(vec![Box::new(always_true)]);
2321
2322        // Build the stream with batch size 8 so the cache reads whole batches
2323        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2324        let stream = builder
2325            .with_batch_size(8)
2326            .with_projection(proj)
2327            .with_row_selection(selection)
2328            .with_row_filter(filter)
2329            .build()
2330            .unwrap();
2331
2332        // Collecting the stream should fail with the sparse column chunk offset
2333        // error we want to reproduce.
2334        let _result: Vec<_> = stream.try_collect().await.unwrap();
2335    }
2336
2337    #[tokio::test]
2338    async fn test_predicate_cache_disabled() {
2339        let k = Int32Array::from_iter_values(0..10);
2340        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2341
2342        let mut buf = Vec::new();
2343        // both the page row limit and batch size are set to 1 to create one page per row
2344        let props = WriterProperties::builder()
2345            .set_data_page_row_count_limit(1)
2346            .set_write_batch_size(1)
2347            .set_max_row_group_size(10)
2348            .set_write_page_header_statistics(true)
2349            .build();
2350        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2351        writer.write(&data).unwrap();
2352        writer.close().unwrap();
2353
2354        let data = Bytes::from(buf);
2355        let metadata = ParquetMetaDataReader::new()
2356            .with_page_index_policy(PageIndexPolicy::Required)
2357            .parse_and_finish(&data)
2358            .unwrap();
2359        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2360
2361        // the filter is not clone-able, so we use a lambda to simplify
2362        let build_filter = || {
2363            let scalar = Int32Array::from_iter_values([5]);
2364            let predicate = ArrowPredicateFn::new(
2365                ProjectionMask::leaves(&parquet_schema, vec![0]),
2366                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2367            );
2368            RowFilter::new(vec![Box::new(predicate)])
2369        };
2370
2371        // select only one of the pages
2372        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2373
2374        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2375        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2376
2377        // using the predicate cache (default)
2378        let reader_with_cache = TestReader::new(data.clone());
2379        let requests_with_cache = reader_with_cache.requests.clone();
2380        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2381            reader_with_cache,
2382            reader_metadata.clone(),
2383        )
2384        .with_batch_size(1000)
2385        .with_row_selection(selection.clone())
2386        .with_row_filter(build_filter())
2387        .build()
2388        .unwrap();
2389        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2390
2391        // disabling the predicate cache
2392        let reader_without_cache = TestReader::new(data);
2393        let requests_without_cache = reader_without_cache.requests.clone();
2394        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2395            reader_without_cache,
2396            reader_metadata,
2397        )
2398        .with_batch_size(1000)
2399        .with_row_selection(selection)
2400        .with_row_filter(build_filter())
2401        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
2402        .build()
2403        .unwrap();
2404        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2405
2406        assert_eq!(batches_with_cache, batches_without_cache);
2407
2408        let requests_with_cache = requests_with_cache.lock().unwrap();
2409        let requests_without_cache = requests_without_cache.lock().unwrap();
2410
2411        // less requests will be made without the predicate cache
2412        assert_eq!(requests_with_cache.len(), 11);
2413        assert_eq!(requests_without_cache.len(), 2);
2414
2415        // less bytes will be retrieved without the predicate cache
2416        assert_eq!(
2417            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2418            433
2419        );
2420        assert_eq!(
2421            requests_without_cache
2422                .iter()
2423                .map(|r| r.len())
2424                .sum::<usize>(),
2425            92
2426        );
2427    }
2428}