parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
62///
63/// Notes:
64///
65/// 1. There is a default implementation for types that implement [`AsyncRead`]
66///    and [`AsyncSeek`], for example [`tokio::fs::File`].
67///
68/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
69///    is enabled, implements this interface for [`ObjectStore`].
70///
71/// [`ObjectStore`]: object_store::ObjectStore
72///
73/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
74pub trait AsyncFileReader: Send {
75    /// Retrieve the bytes in `range`
76    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
79    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80        async move {
81            let mut result = Vec::with_capacity(ranges.len());
82
83            for range in ranges.into_iter() {
84                let data = self.get_bytes(range).await?;
85                result.push(data);
86            }
87
88            Ok(result)
89        }
90        .boxed()
91    }
92
93    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
94    ///
95    /// This is an asynchronous operation as it may involve reading the file
96    /// footer and potentially other metadata from disk or a remote source.
97    ///
98    /// Reading data from Parquet requires the metadata to understand the
99    /// schema, row groups, and location of pages within the file. This metadata
100    /// is stored primarily in the footer of the Parquet file, and can be read using
101    /// [`ParquetMetaDataReader`].
102    ///
103    /// However, implementations can significantly speed up reading Parquet by
104    /// supplying cached metadata or pre-fetched metadata via this API.
105    ///
106    /// # Parameters
107    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
108    ///   and other options that affect how the metadata is read.
109    fn get_metadata<'a>(
110        &'a mut self,
111        options: Option<&'a ArrowReaderOptions>,
112    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
116impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118        self.as_mut().get_bytes(range)
119    }
120
121    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122        self.as_mut().get_byte_ranges(ranges)
123    }
124
125    fn get_metadata<'a>(
126        &'a mut self,
127        options: Option<&'a ArrowReaderOptions>,
128    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129        self.as_mut().get_metadata(options)
130    }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135        async move {
136            self.seek(SeekFrom::End(-(suffix as i64))).await?;
137            let mut buf = Vec::with_capacity(suffix);
138            self.take(suffix as _).read_to_end(&mut buf).await?;
139            Ok(buf.into())
140        }
141        .boxed()
142    }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147        async move {
148            self.seek(SeekFrom::Start(range.start)).await?;
149
150            let to_read = range.end - range.start;
151            let mut buffer = Vec::with_capacity(to_read.try_into()?);
152            let read = self.take(to_read).read_to_end(&mut buffer).await?;
153            if read as u64 != to_read {
154                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155            }
156
157            Ok(buffer.into())
158        }
159        .boxed()
160    }
161
162    fn get_metadata<'a>(
163        &'a mut self,
164        options: Option<&'a ArrowReaderOptions>,
165    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166        async move {
167            let metadata_opts = options.map(|o| o.metadata_options().clone());
168            let mut metadata_reader =
169                ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171            if let Some(opts) = options {
172                metadata_reader = metadata_reader
173                    .with_column_index_policy(opts.column_index_policy())
174                    .with_offset_index_policy(opts.offset_index_policy());
175            }
176
177            #[cfg(feature = "encryption")]
178            let metadata_reader = metadata_reader.with_decryption_properties(
179                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180            );
181
182            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183            Ok(Arc::new(parquet_metadata))
184        }
185        .boxed()
186    }
187}
188
189impl ArrowReaderMetadata {
190    /// Returns a new [`ArrowReaderMetadata`] for this builder
191    ///
192    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
193    pub async fn load_async<T: AsyncFileReader>(
194        input: &mut T,
195        options: ArrowReaderOptions,
196    ) -> Result<Self> {
197        let metadata = input.get_metadata(Some(&options)).await?;
198        Self::try_new(metadata, options)
199    }
200}
201
202#[doc(hidden)]
203/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync readers from async
204///
205/// Allows sharing the same builder for different readers while keeping the same
206/// ParquetRecordBatchStreamBuilder API
207pub struct AsyncReader<T>(T);
208
209/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
210///
211/// This can be used to decode a Parquet file in streaming fashion (without
212/// downloading the whole file at once) from a remote source, such as an object store.
213///
214/// This builder handles reading the parquet file metadata, allowing consumers
215/// to use this information to select what specific columns, row groups, etc.
216/// they wish to be read by the resulting stream.
217///
218/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
219///
220/// See [`ArrowReaderBuilder`] for additional member functions
221pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
225    /// specified source.
226    ///
227    /// # Example
228    /// ```
229    /// # #[tokio::main(flavor="current_thread")]
230    /// # async fn main() {
231    /// #
232    /// # use arrow_array::RecordBatch;
233    /// # use arrow::util::pretty::pretty_format_batches;
234    /// # use futures::TryStreamExt;
235    /// #
236    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
237    /// #
238    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
239    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
240    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
241    /// #     assert_eq!(
242    /// #          &actual_lines, expected_lines,
243    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
244    /// #          expected_lines, actual_lines
245    /// #      );
246    /// #  }
247    /// #
248    /// # let testdata = arrow::util::test_util::parquet_test_data();
249    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
250    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
251    /// // another async I/O reader such as a reader from an object store.
252    /// let file = tokio::fs::File::open(path).await.unwrap();
253    ///
254    /// // Configure options for reading from the async source
255    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
256    ///     .await
257    ///     .unwrap();
258    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
259    /// // a stream that can be used to incrementally read the data in batches
260    /// let stream = builder.build().unwrap();
261    /// // In this example, we collect the stream into a Vec<RecordBatch>
262    /// // but real applications would likely process the batches as they are read
263    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
264    /// // Demonstrate the results are as expected
265    /// assert_batches_eq(
266    ///     &results,
267    ///     &[
268    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
269    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
270    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
271    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
272    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
273    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
274    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
275    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
276    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
277    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
278    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
279    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
280    ///      ],
281    ///  );
282    /// # }
283    /// ```
284    ///
285    /// # Example configuring options and reading metadata
286    ///
287    /// There are many options that control the behavior of the reader, such as
288    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
289    ///
290    /// ```
291    /// # #[tokio::main(flavor="current_thread")]
292    /// # async fn main() {
293    /// #
294    /// # use arrow_array::RecordBatch;
295    /// # use arrow::util::pretty::pretty_format_batches;
296    /// # use futures::TryStreamExt;
297    /// #
298    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
299    /// #
300    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
301    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
302    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
303    /// #     assert_eq!(
304    /// #          &actual_lines, expected_lines,
305    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
306    /// #          expected_lines, actual_lines
307    /// #      );
308    /// #  }
309    /// #
310    /// # let testdata = arrow::util::test_util::parquet_test_data();
311    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
312    /// // As before, use tokio::fs::File to read data using an async I/O.
313    /// let file = tokio::fs::File::open(path).await.unwrap();
314    ///
315    /// // Configure options for reading from the async source, in this case we set the batch size
316    /// // to 3 which produces 3 rows at a time.
317    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
318    ///     .await
319    ///     .unwrap()
320    ///     .with_batch_size(3);
321    ///
322    /// // We can also read the metadata to inspect the schema and other metadata
323    /// // before actually reading the data
324    /// let file_metadata = builder.metadata().file_metadata();
325    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
326    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
327    ///
328    /// let stream = builder.with_projection(mask).build().unwrap();
329    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
330    /// // Print out the results
331    /// assert_batches_eq(
332    ///     &results,
333    ///     &[
334    ///         "+----------+-------------+-----------+",
335    ///         "| bool_col | tinyint_col | float_col |",
336    ///         "+----------+-------------+-----------+",
337    ///         "| true     | 0           | 0.0       |",
338    ///         "| false    | 1           | 1.1       |",
339    ///         "| true     | 0           | 0.0       |",
340    ///         "| false    | 1           | 1.1       |",
341    ///         "| true     | 0           | 0.0       |",
342    ///         "| false    | 1           | 1.1       |",
343    ///         "| true     | 0           | 0.0       |",
344    ///         "| false    | 1           | 1.1       |",
345    ///         "+----------+-------------+-----------+",
346    ///      ],
347    ///  );
348    ///
349    /// // The results has 8 rows, so since we set the batch size to 3, we expect
350    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
351    /// assert_eq!(results.len(), 3);
352    /// # }
353    /// ```
354    pub async fn new(input: T) -> Result<Self> {
355        Self::new_with_options(input, Default::default()).await
356    }
357
358    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
359    /// and [`ArrowReaderOptions`].
360    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362        Ok(Self::new_with_metadata(input, metadata))
363    }
364
365    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
366    ///
367    /// This allows loading metadata once and using it to create multiple builders with
368    /// potentially different settings, that can be read in parallel.
369    ///
370    /// # Example of reading from multiple streams in parallel
371    ///
372    /// ```
373    /// # use std::fs::metadata;
374    /// # use std::sync::Arc;
375    /// # use bytes::Bytes;
376    /// # use arrow_array::{Int32Array, RecordBatch};
377    /// # use arrow_schema::{DataType, Field, Schema};
378    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
379    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
380    /// # use tempfile::tempfile;
381    /// # use futures::StreamExt;
382    /// # #[tokio::main(flavor="current_thread")]
383    /// # async fn main() {
384    /// #
385    /// # let mut file = tempfile().unwrap();
386    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
387    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
388    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
389    /// # writer.write(&batch).unwrap();
390    /// # writer.close().unwrap();
391    /// // open file with parquet data
392    /// let mut file = tokio::fs::File::from_std(file);
393    /// // load metadata once
394    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
395    /// // create two readers, a and b, from the same underlying file
396    /// // without reading the metadata again
397    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
398    ///     file.try_clone().await.unwrap(),
399    ///     meta.clone()
400    /// ).build().unwrap();
401    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
402    ///
403    /// // Can read batches from both readers in parallel
404    /// assert_eq!(
405    ///   a.next().await.unwrap().unwrap(),
406    ///   b.next().await.unwrap().unwrap(),
407    /// );
408    /// # }
409    /// ```
410    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411        Self::new_builder(AsyncReader(input), metadata)
412    }
413
414    /// Read bloom filter for a column in a row group
415    ///
416    /// Returns `None` if the column does not have a bloom filter
417    ///
418    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
419    pub async fn get_row_group_column_bloom_filter(
420        &mut self,
421        row_group_idx: usize,
422        column_idx: usize,
423    ) -> Result<Option<Sbbf>> {
424        let metadata = self.metadata.row_group(row_group_idx);
425        let column_metadata = metadata.column(column_idx);
426
427        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428            offset
429                .try_into()
430                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431        } else {
432            return Ok(None);
433        };
434
435        let buffer = match column_metadata.bloom_filter_length() {
436            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437            None => self
438                .input
439                .0
440                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441        }
442        .await?;
443
444        let (header, bitset_offset) =
445            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447        match header.algorithm {
448            BloomFilterAlgorithm::BLOCK => {
449                // this match exists to future proof the singleton algorithm enum
450            }
451        }
452        match header.compression {
453            BloomFilterCompression::UNCOMPRESSED => {
454                // this match exists to future proof the singleton compression enum
455            }
456        }
457        match header.hash {
458            BloomFilterHash::XXHASH => {
459                // this match exists to future proof the singleton hash enum
460            }
461        }
462
463        let bitset = match column_metadata.bloom_filter_length() {
464            Some(_) => buffer.slice(
465                (TryInto::<usize>::try_into(bitset_offset).unwrap()
466                    - TryInto::<usize>::try_into(offset).unwrap())..,
467            ),
468            None => {
469                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470                    ParquetError::General("Bloom filter length is invalid".to_string())
471                })?;
472                self.input
473                    .0
474                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
475                    .await?
476            }
477        };
478        Ok(Some(Sbbf::new(&bitset)))
479    }
480
481    /// Build a new [`ParquetRecordBatchStream`]
482    ///
483    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
484    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485        let Self {
486            input,
487            metadata,
488            schema,
489            fields,
490            batch_size,
491            row_groups,
492            projection,
493            filter,
494            selection,
495            row_selection_policy: selection_strategy,
496            limit,
497            offset,
498            metrics,
499            max_predicate_cache_size,
500        } = self;
501
502        // Ensure schema of ParquetRecordBatchStream respects projection, and does
503        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
504        let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
505        let projected_fields = schema
506            .fields
507            .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
508        let projected_schema = Arc::new(Schema::new(projected_fields));
509
510        let decoder = ParquetPushDecoderBuilder {
511            input: NoInput,
512            metadata,
513            schema,
514            fields,
515            projection,
516            filter,
517            selection,
518            row_selection_policy: selection_strategy,
519            batch_size,
520            row_groups,
521            limit,
522            offset,
523            metrics,
524            max_predicate_cache_size,
525        }
526        .build()?;
527
528        let request_state = RequestState::None { input: input.0 };
529
530        Ok(ParquetRecordBatchStream {
531            schema: projected_schema,
532            decoder,
533            request_state,
534        })
535    }
536}
537
538/// State machine that tracks outstanding requests to fetch data
539///
540/// The parameter `T` is the input, typically an `AsyncFileReader`
541enum RequestState<T> {
542    /// No outstanding requests
543    None {
544        input: T,
545    },
546    /// There is an outstanding request for data
547    Outstanding {
548        /// Ranges that have been requested
549        ranges: Vec<Range<u64>>,
550        /// Future that will resolve (input, requested_ranges)
551        ///
552        /// Note the future owns the reader while the request is outstanding
553        /// and returns it upon completion
554        future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
555    },
556    Done,
557}
558
559impl<T> RequestState<T>
560where
561    T: AsyncFileReader + Unpin + Send + 'static,
562{
563    /// Issue a request to fetch `ranges`, returning the Outstanding state
564    fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
565        let ranges_captured = ranges.clone();
566
567        // Note this must move the input *into* the future
568        // because the get_byte_ranges future has a lifetime
569        // (aka can have references internally) and thus must
570        // own the input while the request is outstanding.
571        let future = async move {
572            let data = input.get_byte_ranges(ranges_captured).await?;
573            Ok((input, data))
574        }
575        .boxed();
576        RequestState::Outstanding { ranges, future }
577    }
578}
579
580impl<T> std::fmt::Debug for RequestState<T> {
581    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582        match self {
583            RequestState::None { input: _ } => f
584                .debug_struct("RequestState::None")
585                .field("input", &"...")
586                .finish(),
587            RequestState::Outstanding { ranges, .. } => f
588                .debug_struct("RequestState::Outstanding")
589                .field("ranges", &ranges)
590                .finish(),
591            RequestState::Done => {
592                write!(f, "RequestState::Done")
593            }
594        }
595    }
596}
597
598/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
599///
600/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
601/// allowing users to decode record batches separately from I/O.
602///
603/// # I/O Buffering
604///
605/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
606/// (projection + filtering, etc) and decodes the rows from those buffered pages.
607///
608/// For example, if all rows and columns are selected, the entire row group is
609/// buffered in memory during decode. This minimizes the number of IO operations
610/// required, which is especially important for object stores, where IO operations
611/// have latencies in the hundreds of milliseconds
612///
613/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
614/// buffering.
615///
616/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
617pub struct ParquetRecordBatchStream<T> {
618    /// Output schema of the stream
619    schema: SchemaRef,
620    /// Input and Outstanding IO request, if any
621    request_state: RequestState<T>,
622    /// Decoding state machine (no IO)
623    decoder: ParquetPushDecoder,
624}
625
626impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
627    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628        f.debug_struct("ParquetRecordBatchStream")
629            .field("request_state", &self.request_state)
630            .finish()
631    }
632}
633
634impl<T> ParquetRecordBatchStream<T> {
635    /// Returns the projected [`SchemaRef`] for reading the parquet file.
636    ///
637    /// Note that the schema metadata will be stripped here. See
638    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
639    pub fn schema(&self) -> &SchemaRef {
640        &self.schema
641    }
642}
643
644impl<T> ParquetRecordBatchStream<T>
645where
646    T: AsyncFileReader + Unpin + Send + 'static,
647{
648    /// Fetches the next row group from the stream.
649    ///
650    /// Users can continue to call this function to get row groups and decode them concurrently.
651    ///
652    /// ## Notes
653    ///
654    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
655    ///
656    /// ## Returns
657    ///
658    /// - `Ok(None)` if the stream has ended.
659    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
660    /// - `Ok(Some(reader))` which holds all the data for the row group.
661    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
662        loop {
663            // Take ownership of request state to process, leaving self in a
664            // valid state
665            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
666            match request_state {
667                // No outstanding requests, proceed to setup next row group
668                RequestState::None { input } => {
669                    match self.decoder.try_next_reader()? {
670                        DecodeResult::NeedsData(ranges) => {
671                            self.request_state = RequestState::begin_request(input, ranges);
672                            continue; // poll again (as the input might be ready immediately)
673                        }
674                        DecodeResult::Data(reader) => {
675                            self.request_state = RequestState::None { input };
676                            return Ok(Some(reader));
677                        }
678                        DecodeResult::Finished => return Ok(None),
679                    }
680                }
681                RequestState::Outstanding { ranges, future } => {
682                    let (input, data) = future.await?;
683                    // Push the requested data to the decoder and try again
684                    self.decoder.push_ranges(ranges, data)?;
685                    self.request_state = RequestState::None { input };
686                    continue; // try and decode on next iteration
687                }
688                RequestState::Done => {
689                    self.request_state = RequestState::Done;
690                    return Ok(None);
691                }
692            }
693        }
694    }
695}
696
697impl<T> Stream for ParquetRecordBatchStream<T>
698where
699    T: AsyncFileReader + Unpin + Send + 'static,
700{
701    type Item = Result<RecordBatch>;
702    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
703        match self.poll_next_inner(cx) {
704            Ok(res) => {
705                // Successfully decoded a batch, or reached end of stream.
706                // convert Option<RecordBatch> to Option<Result<RecordBatch>>
707                res.map(|res| Ok(res).transpose())
708            }
709            Err(e) => {
710                self.request_state = RequestState::Done;
711                Poll::Ready(Some(Err(e)))
712            }
713        }
714    }
715}
716
717impl<T> ParquetRecordBatchStream<T>
718where
719    T: AsyncFileReader + Unpin + Send + 'static,
720{
721    /// Inner state machine
722    ///
723    /// Note this is separate from poll_next so we can use ? operator to check for errors
724    /// as it returns `Result<Poll<Option<RecordBatch>>>`
725    fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
726        loop {
727            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
728            match request_state {
729                RequestState::None { input } => {
730                    // No outstanding requests, proceed to decode the next batch
731                    match self.decoder.try_decode()? {
732                        DecodeResult::NeedsData(ranges) => {
733                            self.request_state = RequestState::begin_request(input, ranges);
734                            continue; // poll again (as the input might be ready immediately)
735                        }
736                        DecodeResult::Data(batch) => {
737                            self.request_state = RequestState::None { input };
738                            return Ok(Poll::Ready(Some(batch)));
739                        }
740                        DecodeResult::Finished => {
741                            self.request_state = RequestState::Done;
742                            return Ok(Poll::Ready(None));
743                        }
744                    }
745                }
746                RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
747                    // Data was ready, push it to the decoder and continue
748                    Poll::Ready(result) => {
749                        let (input, data) = result?;
750                        // Push the requested data to the decoder
751                        self.decoder.push_ranges(ranges, data)?;
752                        self.request_state = RequestState::None { input };
753                        continue; // next iteration will try to decode the next batch
754                    }
755                    Poll::Pending => {
756                        self.request_state = RequestState::Outstanding { ranges, future };
757                        return Ok(Poll::Pending);
758                    }
759                },
760                RequestState::Done => {
761                    // Stream is done (error or end), return None
762                    self.request_state = RequestState::Done;
763                    return Ok(Poll::Ready(None));
764                }
765            }
766        }
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use crate::arrow::arrow_reader::RowSelectionPolicy;
774    use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
775    use crate::arrow::arrow_reader::{
776        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
777    };
778    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
779    use crate::arrow::schema::virtual_type::RowNumber;
780    use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
781    use crate::file::metadata::PageIndexPolicy;
782    use crate::file::metadata::ParquetMetaDataReader;
783    use crate::file::properties::WriterProperties;
784    use arrow::compute::kernels::cmp::eq;
785    use arrow::compute::or;
786    use arrow::error::Result as ArrowResult;
787    use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
788    use arrow_array::cast::AsArray;
789    use arrow_array::types::Int32Type;
790    use arrow_array::{
791        Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
792        Scalar, StringArray, StructArray, UInt64Array,
793    };
794    use arrow_schema::{DataType, Field, Schema};
795    use arrow_select::concat::concat_batches;
796    use futures::{StreamExt, TryStreamExt};
797    use rand::{Rng, rng};
798    use std::collections::HashMap;
799    use std::sync::{Arc, Mutex};
800    use tempfile::tempfile;
801
802    #[derive(Clone)]
803    struct TestReader {
804        data: Bytes,
805        metadata: Option<Arc<ParquetMetaData>>,
806        requests: Arc<Mutex<Vec<Range<usize>>>>,
807    }
808
809    impl TestReader {
810        fn new(data: Bytes) -> Self {
811            Self {
812                data,
813                metadata: Default::default(),
814                requests: Default::default(),
815            }
816        }
817    }
818
819    impl AsyncFileReader for TestReader {
820        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
821            let range = range.clone();
822            self.requests
823                .lock()
824                .unwrap()
825                .push(range.start as usize..range.end as usize);
826            futures::future::ready(Ok(self
827                .data
828                .slice(range.start as usize..range.end as usize)))
829            .boxed()
830        }
831
832        fn get_metadata<'a>(
833            &'a mut self,
834            options: Option<&'a ArrowReaderOptions>,
835        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
836            let mut metadata_reader = ParquetMetaDataReader::new();
837            if let Some(opts) = options {
838                metadata_reader = metadata_reader
839                    .with_column_index_policy(opts.column_index_policy())
840                    .with_offset_index_policy(opts.offset_index_policy());
841            }
842            self.metadata = Some(Arc::new(
843                metadata_reader.parse_and_finish(&self.data).unwrap(),
844            ));
845            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
846        }
847    }
848
849    #[tokio::test]
850    async fn test_async_reader() {
851        let testdata = arrow::util::test_util::parquet_test_data();
852        let path = format!("{testdata}/alltypes_plain.parquet");
853        let data = Bytes::from(std::fs::read(path).unwrap());
854
855        let async_reader = TestReader::new(data.clone());
856
857        let requests = async_reader.requests.clone();
858        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
859            .await
860            .unwrap();
861
862        let metadata = builder.metadata().clone();
863        assert_eq!(metadata.num_row_groups(), 1);
864
865        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
866        let stream = builder
867            .with_projection(mask.clone())
868            .with_batch_size(1024)
869            .build()
870            .unwrap();
871
872        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
873
874        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
875            .unwrap()
876            .with_projection(mask)
877            .with_batch_size(104)
878            .build()
879            .unwrap()
880            .collect::<ArrowResult<Vec<_>>>()
881            .unwrap();
882
883        assert_eq!(async_batches, sync_batches);
884
885        let requests = requests.lock().unwrap();
886        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
887        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
888
889        assert_eq!(
890            &requests[..],
891            &[
892                offset_1 as usize..(offset_1 + length_1) as usize,
893                offset_2 as usize..(offset_2 + length_2) as usize
894            ]
895        );
896    }
897
898    #[tokio::test]
899    async fn test_async_reader_with_next_row_group() {
900        let testdata = arrow::util::test_util::parquet_test_data();
901        let path = format!("{testdata}/alltypes_plain.parquet");
902        let data = Bytes::from(std::fs::read(path).unwrap());
903
904        let async_reader = TestReader::new(data.clone());
905
906        let requests = async_reader.requests.clone();
907        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
908            .await
909            .unwrap();
910
911        let metadata = builder.metadata().clone();
912        assert_eq!(metadata.num_row_groups(), 1);
913
914        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
915        let mut stream = builder
916            .with_projection(mask.clone())
917            .with_batch_size(1024)
918            .build()
919            .unwrap();
920
921        let mut readers = vec![];
922        while let Some(reader) = stream.next_row_group().await.unwrap() {
923            readers.push(reader);
924        }
925
926        let async_batches: Vec<_> = readers
927            .into_iter()
928            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
929            .collect();
930
931        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
932            .unwrap()
933            .with_projection(mask)
934            .with_batch_size(104)
935            .build()
936            .unwrap()
937            .collect::<ArrowResult<Vec<_>>>()
938            .unwrap();
939
940        assert_eq!(async_batches, sync_batches);
941
942        let requests = requests.lock().unwrap();
943        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
944        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
945
946        assert_eq!(
947            &requests[..],
948            &[
949                offset_1 as usize..(offset_1 + length_1) as usize,
950                offset_2 as usize..(offset_2 + length_2) as usize
951            ]
952        );
953    }
954
955    #[tokio::test]
956    async fn test_async_reader_with_index() {
957        let testdata = arrow::util::test_util::parquet_test_data();
958        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
959        let data = Bytes::from(std::fs::read(path).unwrap());
960
961        let async_reader = TestReader::new(data.clone());
962
963        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
964        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
965            .await
966            .unwrap();
967
968        // The builder should have page and offset indexes loaded now
969        let metadata_with_index = builder.metadata();
970        assert_eq!(metadata_with_index.num_row_groups(), 1);
971
972        // Check offset indexes are present for all columns
973        let offset_index = metadata_with_index.offset_index().unwrap();
974        let column_index = metadata_with_index.column_index().unwrap();
975
976        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
977        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
978
979        let num_columns = metadata_with_index
980            .file_metadata()
981            .schema_descr()
982            .num_columns();
983
984        // Check page indexes are present for all columns
985        offset_index
986            .iter()
987            .for_each(|x| assert_eq!(x.len(), num_columns));
988        column_index
989            .iter()
990            .for_each(|x| assert_eq!(x.len(), num_columns));
991
992        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
993        let stream = builder
994            .with_projection(mask.clone())
995            .with_batch_size(1024)
996            .build()
997            .unwrap();
998
999        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1000
1001        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1002            .unwrap()
1003            .with_projection(mask)
1004            .with_batch_size(1024)
1005            .build()
1006            .unwrap()
1007            .collect::<ArrowResult<Vec<_>>>()
1008            .unwrap();
1009
1010        assert_eq!(async_batches, sync_batches);
1011    }
1012
1013    #[tokio::test]
1014    async fn test_async_reader_with_limit() {
1015        let testdata = arrow::util::test_util::parquet_test_data();
1016        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1017        let data = Bytes::from(std::fs::read(path).unwrap());
1018
1019        let metadata = ParquetMetaDataReader::new()
1020            .parse_and_finish(&data)
1021            .unwrap();
1022        let metadata = Arc::new(metadata);
1023
1024        assert_eq!(metadata.num_row_groups(), 1);
1025
1026        let async_reader = TestReader::new(data.clone());
1027
1028        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1029            .await
1030            .unwrap();
1031
1032        assert_eq!(builder.metadata().num_row_groups(), 1);
1033
1034        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1035        let stream = builder
1036            .with_projection(mask.clone())
1037            .with_batch_size(1024)
1038            .with_limit(1)
1039            .build()
1040            .unwrap();
1041
1042        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1043
1044        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1045            .unwrap()
1046            .with_projection(mask)
1047            .with_batch_size(1024)
1048            .with_limit(1)
1049            .build()
1050            .unwrap()
1051            .collect::<ArrowResult<Vec<_>>>()
1052            .unwrap();
1053
1054        assert_eq!(async_batches, sync_batches);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_async_reader_skip_pages() {
1059        let testdata = arrow::util::test_util::parquet_test_data();
1060        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1061        let data = Bytes::from(std::fs::read(path).unwrap());
1062
1063        let async_reader = TestReader::new(data.clone());
1064
1065        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1066        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1067            .await
1068            .unwrap();
1069
1070        assert_eq!(builder.metadata().num_row_groups(), 1);
1071
1072        let selection = RowSelection::from(vec![
1073            RowSelector::skip(21),   // Skip first page
1074            RowSelector::select(21), // Select page to boundary
1075            RowSelector::skip(41),   // Skip multiple pages
1076            RowSelector::select(41), // Select multiple pages
1077            RowSelector::skip(25),   // Skip page across boundary
1078            RowSelector::select(25), // Select across page boundary
1079            RowSelector::skip(7116), // Skip to final page boundary
1080            RowSelector::select(10), // Select final page
1081        ]);
1082
1083        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1084
1085        let stream = builder
1086            .with_projection(mask.clone())
1087            .with_row_selection(selection.clone())
1088            .build()
1089            .expect("building stream");
1090
1091        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1092
1093        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1094            .unwrap()
1095            .with_projection(mask)
1096            .with_batch_size(1024)
1097            .with_row_selection(selection)
1098            .build()
1099            .unwrap()
1100            .collect::<ArrowResult<Vec<_>>>()
1101            .unwrap();
1102
1103        assert_eq!(async_batches, sync_batches);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_fuzz_async_reader_selection() {
1108        let testdata = arrow::util::test_util::parquet_test_data();
1109        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1110        let data = Bytes::from(std::fs::read(path).unwrap());
1111
1112        let mut rand = rng();
1113
1114        for _ in 0..100 {
1115            let mut expected_rows = 0;
1116            let mut total_rows = 0;
1117            let mut skip = false;
1118            let mut selectors = vec![];
1119
1120            while total_rows < 7300 {
1121                let row_count: usize = rand.random_range(1..100);
1122
1123                let row_count = row_count.min(7300 - total_rows);
1124
1125                selectors.push(RowSelector { row_count, skip });
1126
1127                total_rows += row_count;
1128                if !skip {
1129                    expected_rows += row_count;
1130                }
1131
1132                skip = !skip;
1133            }
1134
1135            let selection = RowSelection::from(selectors);
1136
1137            let async_reader = TestReader::new(data.clone());
1138
1139            let options =
1140                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1141            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1142                .await
1143                .unwrap();
1144
1145            assert_eq!(builder.metadata().num_row_groups(), 1);
1146
1147            let col_idx: usize = rand.random_range(0..13);
1148            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1149
1150            let stream = builder
1151                .with_projection(mask.clone())
1152                .with_row_selection(selection.clone())
1153                .build()
1154                .expect("building stream");
1155
1156            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1157
1158            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1159
1160            assert_eq!(actual_rows, expected_rows);
1161        }
1162    }
1163
1164    #[tokio::test]
1165    async fn test_async_reader_zero_row_selector() {
1166        //See https://github.com/apache/arrow-rs/issues/2669
1167        let testdata = arrow::util::test_util::parquet_test_data();
1168        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1169        let data = Bytes::from(std::fs::read(path).unwrap());
1170
1171        let mut rand = rng();
1172
1173        let mut expected_rows = 0;
1174        let mut total_rows = 0;
1175        let mut skip = false;
1176        let mut selectors = vec![];
1177
1178        selectors.push(RowSelector {
1179            row_count: 0,
1180            skip: false,
1181        });
1182
1183        while total_rows < 7300 {
1184            let row_count: usize = rand.random_range(1..100);
1185
1186            let row_count = row_count.min(7300 - total_rows);
1187
1188            selectors.push(RowSelector { row_count, skip });
1189
1190            total_rows += row_count;
1191            if !skip {
1192                expected_rows += row_count;
1193            }
1194
1195            skip = !skip;
1196        }
1197
1198        let selection = RowSelection::from(selectors);
1199
1200        let async_reader = TestReader::new(data.clone());
1201
1202        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1203        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1204            .await
1205            .unwrap();
1206
1207        assert_eq!(builder.metadata().num_row_groups(), 1);
1208
1209        let col_idx: usize = rand.random_range(0..13);
1210        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1211
1212        let stream = builder
1213            .with_projection(mask.clone())
1214            .with_row_selection(selection.clone())
1215            .build()
1216            .expect("building stream");
1217
1218        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1219
1220        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1221
1222        assert_eq!(actual_rows, expected_rows);
1223    }
1224
1225    #[tokio::test]
1226    async fn test_row_filter_full_page_skip_is_handled_async() {
1227        let first_value: i64 = 1111;
1228        let last_value: i64 = 9999;
1229        let num_rows: usize = 12;
1230
1231        // build data with row selection average length 4
1232        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
1233        // The Row Selection would be [1111, (skip 10), 9999]
1234        let schema = Arc::new(Schema::new(vec![
1235            Field::new("key", DataType::Int64, false),
1236            Field::new("value", DataType::Int64, false),
1237        ]));
1238
1239        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
1240        int_values[0] = first_value;
1241        int_values[num_rows - 1] = last_value;
1242        let keys = Int64Array::from(int_values.clone());
1243        let values = Int64Array::from(int_values.clone());
1244        let batch = RecordBatch::try_new(
1245            Arc::clone(&schema),
1246            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1247        )
1248        .unwrap();
1249
1250        let props = WriterProperties::builder()
1251            .set_write_batch_size(2)
1252            .set_data_page_row_count_limit(2)
1253            .build();
1254
1255        let mut buffer = Vec::new();
1256        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
1257        writer.write(&batch).unwrap();
1258        writer.close().unwrap();
1259        let data = Bytes::from(buffer);
1260
1261        let builder = ParquetRecordBatchStreamBuilder::new_with_options(
1262            TestReader::new(data.clone()),
1263            ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1264        )
1265        .await
1266        .unwrap();
1267        let schema = builder.parquet_schema().clone();
1268        let filter_mask = ProjectionMask::leaves(&schema, [0]);
1269
1270        let make_predicate = |mask: ProjectionMask| {
1271            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
1272                let column = batch.column(0);
1273                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
1274                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
1275                or(&match_first, &match_second)
1276            })
1277        };
1278
1279        let predicate = make_predicate(filter_mask.clone());
1280
1281        // The batch size is set to 12 to read all rows in one go after filtering
1282        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
1283        let stream = ParquetRecordBatchStreamBuilder::new_with_options(
1284            TestReader::new(data.clone()),
1285            ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1286        )
1287        .await
1288        .unwrap()
1289        .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1290        .with_batch_size(12)
1291        .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
1292        .build()
1293        .unwrap();
1294
1295        let schema = stream.schema().clone();
1296        let batches: Vec<_> = stream.try_collect().await.unwrap();
1297        let result = concat_batches(&schema, &batches).unwrap();
1298        assert_eq!(result.num_rows(), 2);
1299    }
1300
1301    #[tokio::test]
1302    async fn test_row_filter() {
1303        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1304        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1305        let data = RecordBatch::try_from_iter([
1306            ("a", Arc::new(a) as ArrayRef),
1307            ("b", Arc::new(b) as ArrayRef),
1308        ])
1309        .unwrap();
1310
1311        let mut buf = Vec::with_capacity(1024);
1312        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1313        writer.write(&data).unwrap();
1314        writer.close().unwrap();
1315
1316        let data: Bytes = buf.into();
1317        let metadata = ParquetMetaDataReader::new()
1318            .parse_and_finish(&data)
1319            .unwrap();
1320        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1321
1322        let test = TestReader::new(data);
1323        let requests = test.requests.clone();
1324
1325        let a_scalar = StringArray::from_iter_values(["b"]);
1326        let a_filter = ArrowPredicateFn::new(
1327            ProjectionMask::leaves(&parquet_schema, vec![0]),
1328            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1329        );
1330
1331        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1332
1333        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1334        let stream = ParquetRecordBatchStreamBuilder::new(test)
1335            .await
1336            .unwrap()
1337            .with_projection(mask.clone())
1338            .with_batch_size(1024)
1339            .with_row_filter(filter)
1340            .build()
1341            .unwrap();
1342
1343        let batches: Vec<_> = stream.try_collect().await.unwrap();
1344        assert_eq!(batches.len(), 1);
1345
1346        let batch = &batches[0];
1347        assert_eq!(batch.num_columns(), 2);
1348
1349        // Filter should have kept only rows with "b" in column 0
1350        assert_eq!(
1351            batch.column(0).as_ref(),
1352            &StringArray::from_iter_values(["b", "b", "b"])
1353        );
1354        assert_eq!(
1355            batch.column(1).as_ref(),
1356            &StringArray::from_iter_values(["2", "3", "4"])
1357        );
1358
1359        // Should only have made 2 requests:
1360        // * First request fetches data for evaluating the predicate
1361        // * Second request fetches data for evaluating the projection
1362        assert_eq!(requests.lock().unwrap().len(), 2);
1363    }
1364
1365    #[tokio::test]
1366    async fn test_two_row_filters() {
1367        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1368        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1369        let c = Int32Array::from_iter(0..6);
1370        let data = RecordBatch::try_from_iter([
1371            ("a", Arc::new(a) as ArrayRef),
1372            ("b", Arc::new(b) as ArrayRef),
1373            ("c", Arc::new(c) as ArrayRef),
1374        ])
1375        .unwrap();
1376
1377        let mut buf = Vec::with_capacity(1024);
1378        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1379        writer.write(&data).unwrap();
1380        writer.close().unwrap();
1381
1382        let data: Bytes = buf.into();
1383        let metadata = ParquetMetaDataReader::new()
1384            .parse_and_finish(&data)
1385            .unwrap();
1386        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1387
1388        let test = TestReader::new(data);
1389        let requests = test.requests.clone();
1390
1391        let a_scalar = StringArray::from_iter_values(["b"]);
1392        let a_filter = ArrowPredicateFn::new(
1393            ProjectionMask::leaves(&parquet_schema, vec![0]),
1394            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1395        );
1396
1397        let b_scalar = StringArray::from_iter_values(["4"]);
1398        let b_filter = ArrowPredicateFn::new(
1399            ProjectionMask::leaves(&parquet_schema, vec![1]),
1400            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1401        );
1402
1403        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1404
1405        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1406        let stream = ParquetRecordBatchStreamBuilder::new(test)
1407            .await
1408            .unwrap()
1409            .with_projection(mask.clone())
1410            .with_batch_size(1024)
1411            .with_row_filter(filter)
1412            .build()
1413            .unwrap();
1414
1415        let batches: Vec<_> = stream.try_collect().await.unwrap();
1416        assert_eq!(batches.len(), 1);
1417
1418        let batch = &batches[0];
1419        assert_eq!(batch.num_rows(), 1);
1420        assert_eq!(batch.num_columns(), 2);
1421
1422        let col = batch.column(0);
1423        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1424        assert_eq!(val, "b");
1425
1426        let col = batch.column(1);
1427        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1428        assert_eq!(val, 3);
1429
1430        // Should only have made 3 requests
1431        // * First request fetches data for evaluating the first predicate
1432        // * Second request fetches data for evaluating the second predicate
1433        // * Third request fetches data for evaluating the projection
1434        assert_eq!(requests.lock().unwrap().len(), 3);
1435    }
1436
1437    #[tokio::test]
1438    async fn test_limit_multiple_row_groups() {
1439        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1440        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1441        let c = Int32Array::from_iter(0..6);
1442        let data = RecordBatch::try_from_iter([
1443            ("a", Arc::new(a) as ArrayRef),
1444            ("b", Arc::new(b) as ArrayRef),
1445            ("c", Arc::new(c) as ArrayRef),
1446        ])
1447        .unwrap();
1448
1449        let mut buf = Vec::with_capacity(1024);
1450        let props = WriterProperties::builder()
1451            .set_max_row_group_size(3)
1452            .build();
1453        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1454        writer.write(&data).unwrap();
1455        writer.close().unwrap();
1456
1457        let data: Bytes = buf.into();
1458        let metadata = ParquetMetaDataReader::new()
1459            .parse_and_finish(&data)
1460            .unwrap();
1461
1462        assert_eq!(metadata.num_row_groups(), 2);
1463
1464        let test = TestReader::new(data);
1465
1466        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1467            .await
1468            .unwrap()
1469            .with_batch_size(1024)
1470            .with_limit(4)
1471            .build()
1472            .unwrap();
1473
1474        let batches: Vec<_> = stream.try_collect().await.unwrap();
1475        // Expect one batch for each row group
1476        assert_eq!(batches.len(), 2);
1477
1478        let batch = &batches[0];
1479        // First batch should contain all rows
1480        assert_eq!(batch.num_rows(), 3);
1481        assert_eq!(batch.num_columns(), 3);
1482        let col2 = batch.column(2).as_primitive::<Int32Type>();
1483        assert_eq!(col2.values(), &[0, 1, 2]);
1484
1485        let batch = &batches[1];
1486        // Second batch should trigger the limit and only have one row
1487        assert_eq!(batch.num_rows(), 1);
1488        assert_eq!(batch.num_columns(), 3);
1489        let col2 = batch.column(2).as_primitive::<Int32Type>();
1490        assert_eq!(col2.values(), &[3]);
1491
1492        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1493            .await
1494            .unwrap()
1495            .with_offset(2)
1496            .with_limit(3)
1497            .build()
1498            .unwrap();
1499
1500        let batches: Vec<_> = stream.try_collect().await.unwrap();
1501        // Expect one batch for each row group
1502        assert_eq!(batches.len(), 2);
1503
1504        let batch = &batches[0];
1505        // First batch should contain one row
1506        assert_eq!(batch.num_rows(), 1);
1507        assert_eq!(batch.num_columns(), 3);
1508        let col2 = batch.column(2).as_primitive::<Int32Type>();
1509        assert_eq!(col2.values(), &[2]);
1510
1511        let batch = &batches[1];
1512        // Second batch should contain two rows
1513        assert_eq!(batch.num_rows(), 2);
1514        assert_eq!(batch.num_columns(), 3);
1515        let col2 = batch.column(2).as_primitive::<Int32Type>();
1516        assert_eq!(col2.values(), &[3, 4]);
1517
1518        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1519            .await
1520            .unwrap()
1521            .with_offset(4)
1522            .with_limit(20)
1523            .build()
1524            .unwrap();
1525
1526        let batches: Vec<_> = stream.try_collect().await.unwrap();
1527        // Should skip first row group
1528        assert_eq!(batches.len(), 1);
1529
1530        let batch = &batches[0];
1531        // First batch should contain two rows
1532        assert_eq!(batch.num_rows(), 2);
1533        assert_eq!(batch.num_columns(), 3);
1534        let col2 = batch.column(2).as_primitive::<Int32Type>();
1535        assert_eq!(col2.values(), &[4, 5]);
1536    }
1537
1538    #[tokio::test]
1539    async fn test_row_filter_with_index() {
1540        let testdata = arrow::util::test_util::parquet_test_data();
1541        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1542        let data = Bytes::from(std::fs::read(path).unwrap());
1543
1544        let metadata = ParquetMetaDataReader::new()
1545            .parse_and_finish(&data)
1546            .unwrap();
1547        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1548
1549        assert_eq!(metadata.num_row_groups(), 1);
1550
1551        let async_reader = TestReader::new(data.clone());
1552
1553        let a_filter =
1554            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1555                Ok(batch.column(0).as_boolean().clone())
1556            });
1557
1558        let b_scalar = Int8Array::from(vec![2]);
1559        let b_filter = ArrowPredicateFn::new(
1560            ProjectionMask::leaves(&parquet_schema, vec![2]),
1561            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1562        );
1563
1564        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1565
1566        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1567
1568        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1569        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1570            .await
1571            .unwrap()
1572            .with_projection(mask.clone())
1573            .with_batch_size(1024)
1574            .with_row_filter(filter)
1575            .build()
1576            .unwrap();
1577
1578        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1579
1580        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1581
1582        assert_eq!(total_rows, 730);
1583    }
1584
1585    #[tokio::test]
1586    async fn test_batch_size_overallocate() {
1587        let testdata = arrow::util::test_util::parquet_test_data();
1588        // `alltypes_plain.parquet` only have 8 rows
1589        let path = format!("{testdata}/alltypes_plain.parquet");
1590        let data = Bytes::from(std::fs::read(path).unwrap());
1591
1592        let async_reader = TestReader::new(data.clone());
1593
1594        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1595            .await
1596            .unwrap();
1597
1598        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1599
1600        let builder = builder
1601            .with_projection(ProjectionMask::all())
1602            .with_batch_size(1024);
1603
1604        // even though the batch size is set to 1024, it should adjust to the max
1605        // number of rows in the file (8)
1606        assert_ne!(1024, file_rows);
1607        assert_eq!(builder.batch_size, file_rows);
1608
1609        let _stream = builder.build().unwrap();
1610    }
1611
1612    #[tokio::test]
1613    async fn test_get_row_group_column_bloom_filter_without_length() {
1614        let testdata = arrow::util::test_util::parquet_test_data();
1615        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1616        let data = Bytes::from(std::fs::read(path).unwrap());
1617        test_get_row_group_column_bloom_filter(data, false).await;
1618    }
1619
1620    #[tokio::test]
1621    async fn test_parquet_record_batch_stream_schema() {
1622        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1623            schema.flattened_fields().iter().map(|f| f.name()).collect()
1624        }
1625
1626        // ParquetRecordBatchReaderBuilder::schema differs from
1627        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1628        // schema contents (in terms of custom metadata attached to schema, and fields
1629        // returned). Test to ensure this remains consistent behaviour.
1630        //
1631        // Ensure same for asynchronous versions of the above.
1632
1633        // Prep data, for a schema with nested fields, with custom metadata
1634        let mut metadata = HashMap::with_capacity(1);
1635        metadata.insert("key".to_string(), "value".to_string());
1636
1637        let nested_struct_array = StructArray::from(vec![
1638            (
1639                Arc::new(Field::new("d", DataType::Utf8, true)),
1640                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1641            ),
1642            (
1643                Arc::new(Field::new("e", DataType::Utf8, true)),
1644                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1645            ),
1646        ]);
1647        let struct_array = StructArray::from(vec![
1648            (
1649                Arc::new(Field::new("a", DataType::Int32, true)),
1650                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1651            ),
1652            (
1653                Arc::new(Field::new("b", DataType::UInt64, true)),
1654                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1655            ),
1656            (
1657                Arc::new(Field::new(
1658                    "c",
1659                    nested_struct_array.data_type().clone(),
1660                    true,
1661                )),
1662                Arc::new(nested_struct_array) as ArrayRef,
1663            ),
1664        ]);
1665
1666        let schema =
1667            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1668        let record_batch = RecordBatch::from(struct_array)
1669            .with_schema(schema.clone())
1670            .unwrap();
1671
1672        // Write parquet with custom metadata in schema
1673        let mut file = tempfile().unwrap();
1674        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1675        writer.write(&record_batch).unwrap();
1676        writer.close().unwrap();
1677
1678        let all_fields = ["a", "b", "c", "d", "e"];
1679        // (leaf indices in mask, expected names in output schema all fields)
1680        let projections = [
1681            (vec![], vec![]),
1682            (vec![0], vec!["a"]),
1683            (vec![0, 1], vec!["a", "b"]),
1684            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1685            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1686        ];
1687
1688        // Ensure we're consistent for each of these projections
1689        for (indices, expected_projected_names) in projections {
1690            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1691                // Builder schema should preserve all fields and metadata
1692                assert_eq!(get_all_field_names(&builder), all_fields);
1693                assert_eq!(builder.metadata, metadata);
1694                // Reader & batch schema should show only projected fields, and no metadata
1695                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1696                assert_eq!(reader.metadata, HashMap::default());
1697                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1698                assert_eq!(batch.metadata, HashMap::default());
1699            };
1700
1701            let builder =
1702                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1703            let sync_builder_schema = builder.schema().clone();
1704            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1705            let mut reader = builder.with_projection(mask).build().unwrap();
1706            let sync_reader_schema = reader.schema();
1707            let batch = reader.next().unwrap().unwrap();
1708            let sync_batch_schema = batch.schema();
1709            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1710
1711            // asynchronous should be same
1712            let file = tokio::fs::File::from(file.try_clone().unwrap());
1713            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1714            let async_builder_schema = builder.schema().clone();
1715            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1716            let mut reader = builder.with_projection(mask).build().unwrap();
1717            let async_reader_schema = reader.schema().clone();
1718            let batch = reader.next().await.unwrap().unwrap();
1719            let async_batch_schema = batch.schema();
1720            assert_schemas(
1721                async_builder_schema,
1722                async_reader_schema,
1723                async_batch_schema,
1724            );
1725        }
1726    }
1727
1728    #[tokio::test]
1729    async fn test_get_row_group_column_bloom_filter_with_length() {
1730        // convert to new parquet file with bloom_filter_length
1731        let testdata = arrow::util::test_util::parquet_test_data();
1732        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1733        let data = Bytes::from(std::fs::read(path).unwrap());
1734        let async_reader = TestReader::new(data.clone());
1735        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1736            .await
1737            .unwrap();
1738        let schema = builder.schema().clone();
1739        let stream = builder.build().unwrap();
1740        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1741
1742        let mut parquet_data = Vec::new();
1743        let props = WriterProperties::builder()
1744            .set_bloom_filter_enabled(true)
1745            .build();
1746        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1747        for batch in batches {
1748            writer.write(&batch).unwrap();
1749        }
1750        writer.close().unwrap();
1751
1752        // test the new parquet file
1753        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1754    }
1755
1756    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1757        let async_reader = TestReader::new(data.clone());
1758
1759        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1760            .await
1761            .unwrap();
1762
1763        let metadata = builder.metadata();
1764        assert_eq!(metadata.num_row_groups(), 1);
1765        let row_group = metadata.row_group(0);
1766        let column = row_group.column(0);
1767        assert_eq!(column.bloom_filter_length().is_some(), with_length);
1768
1769        let sbbf = builder
1770            .get_row_group_column_bloom_filter(0, 0)
1771            .await
1772            .unwrap()
1773            .unwrap();
1774        assert!(sbbf.check(&"Hello"));
1775        assert!(!sbbf.check(&"Hello_Not_Exists"));
1776    }
1777
1778    #[tokio::test]
1779    async fn test_nested_skip() {
1780        let schema = Arc::new(Schema::new(vec![
1781            Field::new("col_1", DataType::UInt64, false),
1782            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1783        ]));
1784
1785        // Default writer properties
1786        let props = WriterProperties::builder()
1787            .set_data_page_row_count_limit(256)
1788            .set_write_batch_size(256)
1789            .set_max_row_group_size(1024);
1790
1791        // Write data
1792        let mut file = tempfile().unwrap();
1793        let mut writer =
1794            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1795
1796        let mut builder = ListBuilder::new(StringBuilder::new());
1797        for id in 0..1024 {
1798            match id % 3 {
1799                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1800                1 => builder.append_value([Some(format!("id_{id}"))]),
1801                _ => builder.append_null(),
1802            }
1803        }
1804        let refs = vec![
1805            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1806            Arc::new(builder.finish()) as ArrayRef,
1807        ];
1808
1809        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1810        writer.write(&batch).unwrap();
1811        writer.close().unwrap();
1812
1813        let selections = [
1814            RowSelection::from(vec![
1815                RowSelector::skip(313),
1816                RowSelector::select(1),
1817                RowSelector::skip(709),
1818                RowSelector::select(1),
1819            ]),
1820            RowSelection::from(vec![
1821                RowSelector::skip(255),
1822                RowSelector::select(1),
1823                RowSelector::skip(767),
1824                RowSelector::select(1),
1825            ]),
1826            RowSelection::from(vec![
1827                RowSelector::select(255),
1828                RowSelector::skip(1),
1829                RowSelector::select(767),
1830                RowSelector::skip(1),
1831            ]),
1832            RowSelection::from(vec![
1833                RowSelector::skip(254),
1834                RowSelector::select(1),
1835                RowSelector::select(1),
1836                RowSelector::skip(767),
1837                RowSelector::select(1),
1838            ]),
1839        ];
1840
1841        for selection in selections {
1842            let expected = selection.row_count();
1843            // Read data
1844            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1845                tokio::fs::File::from_std(file.try_clone().unwrap()),
1846                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1847            )
1848            .await
1849            .unwrap();
1850
1851            reader = reader.with_row_selection(selection);
1852
1853            let mut stream = reader.build().unwrap();
1854
1855            let mut total_rows = 0;
1856            while let Some(rb) = stream.next().await {
1857                let rb = rb.unwrap();
1858                total_rows += rb.num_rows();
1859            }
1860            assert_eq!(total_rows, expected);
1861        }
1862    }
1863
1864    #[tokio::test]
1865    async fn test_row_filter_nested() {
1866        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1867        let b = StructArray::from(vec![
1868            (
1869                Arc::new(Field::new("aa", DataType::Utf8, true)),
1870                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1871            ),
1872            (
1873                Arc::new(Field::new("bb", DataType::Utf8, true)),
1874                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1875            ),
1876        ]);
1877        let c = Int32Array::from_iter(0..6);
1878        let data = RecordBatch::try_from_iter([
1879            ("a", Arc::new(a) as ArrayRef),
1880            ("b", Arc::new(b) as ArrayRef),
1881            ("c", Arc::new(c) as ArrayRef),
1882        ])
1883        .unwrap();
1884
1885        let mut buf = Vec::with_capacity(1024);
1886        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1887        writer.write(&data).unwrap();
1888        writer.close().unwrap();
1889
1890        let data: Bytes = buf.into();
1891        let metadata = ParquetMetaDataReader::new()
1892            .parse_and_finish(&data)
1893            .unwrap();
1894        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1895
1896        let test = TestReader::new(data);
1897        let requests = test.requests.clone();
1898
1899        let a_scalar = StringArray::from_iter_values(["b"]);
1900        let a_filter = ArrowPredicateFn::new(
1901            ProjectionMask::leaves(&parquet_schema, vec![0]),
1902            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1903        );
1904
1905        let b_scalar = StringArray::from_iter_values(["4"]);
1906        let b_filter = ArrowPredicateFn::new(
1907            ProjectionMask::leaves(&parquet_schema, vec![2]),
1908            move |batch| {
1909                // Filter on the second element of the struct.
1910                let struct_array = batch
1911                    .column(0)
1912                    .as_any()
1913                    .downcast_ref::<StructArray>()
1914                    .unwrap();
1915                eq(struct_array.column(0), &Scalar::new(&b_scalar))
1916            },
1917        );
1918
1919        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1920
1921        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
1922        let stream = ParquetRecordBatchStreamBuilder::new(test)
1923            .await
1924            .unwrap()
1925            .with_projection(mask.clone())
1926            .with_batch_size(1024)
1927            .with_row_filter(filter)
1928            .build()
1929            .unwrap();
1930
1931        let batches: Vec<_> = stream.try_collect().await.unwrap();
1932        assert_eq!(batches.len(), 1);
1933
1934        let batch = &batches[0];
1935        assert_eq!(batch.num_rows(), 1);
1936        assert_eq!(batch.num_columns(), 2);
1937
1938        let col = batch.column(0);
1939        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1940        assert_eq!(val, "b");
1941
1942        let col = batch.column(1);
1943        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1944        assert_eq!(val, 3);
1945
1946        // Should only have made 3 requests
1947        // * First request fetches data for evaluating the first predicate
1948        // * Second request fetches data for evaluating the second predicate
1949        // * Third request fetches data for evaluating the projection
1950        assert_eq!(requests.lock().unwrap().len(), 3);
1951    }
1952
1953    #[tokio::test]
1954    #[allow(deprecated)]
1955    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1956        use tokio::fs::File;
1957        let testdata = arrow::util::test_util::parquet_test_data();
1958        let path = format!("{testdata}/alltypes_plain.parquet");
1959        let mut file = File::open(&path).await.unwrap();
1960        let file_size = file.metadata().await.unwrap().len();
1961        let mut metadata = ParquetMetaDataReader::new()
1962            .with_page_indexes(true)
1963            .load_and_finish(&mut file, file_size)
1964            .await
1965            .unwrap();
1966
1967        metadata.set_offset_index(Some(vec![]));
1968        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1969        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1970        let reader =
1971            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1972                .build()
1973                .unwrap();
1974
1975        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1976        assert_eq!(result.len(), 1);
1977    }
1978
1979    #[tokio::test]
1980    #[allow(deprecated)]
1981    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1982        use tokio::fs::File;
1983        let testdata = arrow::util::test_util::parquet_test_data();
1984        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1985        let mut file = File::open(&path).await.unwrap();
1986        let file_size = file.metadata().await.unwrap().len();
1987        let metadata = ParquetMetaDataReader::new()
1988            .with_page_indexes(true)
1989            .load_and_finish(&mut file, file_size)
1990            .await
1991            .unwrap();
1992
1993        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1994        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1995        let reader =
1996            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1997                .build()
1998                .unwrap();
1999
2000        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2001        assert_eq!(result.len(), 8);
2002    }
2003
2004    #[tokio::test]
2005    #[allow(deprecated)]
2006    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2007        use tempfile::TempDir;
2008        use tokio::fs::File;
2009        fn write_metadata_to_local_file(
2010            metadata: ParquetMetaData,
2011            file: impl AsRef<std::path::Path>,
2012        ) {
2013            use crate::file::metadata::ParquetMetaDataWriter;
2014            use std::fs::File;
2015            let file = File::create(file).unwrap();
2016            ParquetMetaDataWriter::new(file, &metadata)
2017                .finish()
2018                .unwrap()
2019        }
2020
2021        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2022            use std::fs::File;
2023            let file = File::open(file).unwrap();
2024            ParquetMetaDataReader::new()
2025                .with_page_indexes(true)
2026                .parse_and_finish(&file)
2027                .unwrap()
2028        }
2029
2030        let testdata = arrow::util::test_util::parquet_test_data();
2031        let path = format!("{testdata}/alltypes_plain.parquet");
2032        let mut file = File::open(&path).await.unwrap();
2033        let file_size = file.metadata().await.unwrap().len();
2034        let metadata = ParquetMetaDataReader::new()
2035            .with_page_indexes(true)
2036            .load_and_finish(&mut file, file_size)
2037            .await
2038            .unwrap();
2039
2040        let tempdir = TempDir::new().unwrap();
2041        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2042        write_metadata_to_local_file(metadata, &metadata_path);
2043        let metadata = read_metadata_from_local_file(&metadata_path);
2044
2045        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2046        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2047        let reader =
2048            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2049                .build()
2050                .unwrap();
2051
2052        // Panics here
2053        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2054        assert_eq!(result.len(), 1);
2055    }
2056
2057    #[tokio::test]
2058    async fn test_cached_array_reader_sparse_offset_error() {
2059        use futures::TryStreamExt;
2060
2061        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2062        use arrow_array::{BooleanArray, RecordBatch};
2063
2064        let testdata = arrow::util::test_util::parquet_test_data();
2065        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2066        let data = Bytes::from(std::fs::read(path).unwrap());
2067
2068        let async_reader = TestReader::new(data);
2069
2070        // Enable page index so the fetch logic loads only required pages
2071        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2072        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2073            .await
2074            .unwrap();
2075
2076        // Skip the first 22 rows (entire first Parquet page) and then select the
2077        // next 3 rows (22, 23, 24). This means the fetch step will not include
2078        // the first page starting at file offset 0.
2079        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2080
2081        // Trivial predicate on column 0 that always returns `true`. Using the
2082        // same column in both predicate and projection activates the caching
2083        // layer (Producer/Consumer pattern).
2084        let parquet_schema = builder.parquet_schema();
2085        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2086        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2087            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2088        });
2089        let filter = RowFilter::new(vec![Box::new(always_true)]);
2090
2091        // Build the stream with batch size 8 so the cache reads whole batches
2092        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2093        let stream = builder
2094            .with_batch_size(8)
2095            .with_projection(proj)
2096            .with_row_selection(selection)
2097            .with_row_filter(filter)
2098            .build()
2099            .unwrap();
2100
2101        // Collecting the stream should fail with the sparse column chunk offset
2102        // error we want to reproduce.
2103        let _result: Vec<_> = stream.try_collect().await.unwrap();
2104    }
2105
2106    #[tokio::test]
2107    async fn test_predicate_cache_disabled() {
2108        let k = Int32Array::from_iter_values(0..10);
2109        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2110
2111        let mut buf = Vec::new();
2112        // both the page row limit and batch size are set to 1 to create one page per row
2113        let props = WriterProperties::builder()
2114            .set_data_page_row_count_limit(1)
2115            .set_write_batch_size(1)
2116            .set_max_row_group_size(10)
2117            .set_write_page_header_statistics(true)
2118            .build();
2119        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2120        writer.write(&data).unwrap();
2121        writer.close().unwrap();
2122
2123        let data = Bytes::from(buf);
2124        let metadata = ParquetMetaDataReader::new()
2125            .with_page_index_policy(PageIndexPolicy::Required)
2126            .parse_and_finish(&data)
2127            .unwrap();
2128        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2129
2130        // the filter is not clone-able, so we use a lambda to simplify
2131        let build_filter = || {
2132            let scalar = Int32Array::from_iter_values([5]);
2133            let predicate = ArrowPredicateFn::new(
2134                ProjectionMask::leaves(&parquet_schema, vec![0]),
2135                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2136            );
2137            RowFilter::new(vec![Box::new(predicate)])
2138        };
2139
2140        // select only one of the pages
2141        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2142
2143        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2144        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2145
2146        // using the predicate cache (default)
2147        let reader_with_cache = TestReader::new(data.clone());
2148        let requests_with_cache = reader_with_cache.requests.clone();
2149        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2150            reader_with_cache,
2151            reader_metadata.clone(),
2152        )
2153        .with_batch_size(1000)
2154        .with_row_selection(selection.clone())
2155        .with_row_filter(build_filter())
2156        .build()
2157        .unwrap();
2158        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2159
2160        // disabling the predicate cache
2161        let reader_without_cache = TestReader::new(data);
2162        let requests_without_cache = reader_without_cache.requests.clone();
2163        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2164            reader_without_cache,
2165            reader_metadata,
2166        )
2167        .with_batch_size(1000)
2168        .with_row_selection(selection)
2169        .with_row_filter(build_filter())
2170        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
2171        .build()
2172        .unwrap();
2173        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2174
2175        assert_eq!(batches_with_cache, batches_without_cache);
2176
2177        let requests_with_cache = requests_with_cache.lock().unwrap();
2178        let requests_without_cache = requests_without_cache.lock().unwrap();
2179
2180        // less requests will be made without the predicate cache
2181        assert_eq!(requests_with_cache.len(), 11);
2182        assert_eq!(requests_without_cache.len(), 2);
2183
2184        // less bytes will be retrieved without the predicate cache
2185        assert_eq!(
2186            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2187            433
2188        );
2189        assert_eq!(
2190            requests_without_cache
2191                .iter()
2192                .map(|r| r.len())
2193                .sum::<usize>(),
2194            92
2195        );
2196    }
2197
2198    #[test]
2199    fn test_row_numbers_with_multiple_row_groups() {
2200        test_row_numbers_with_multiple_row_groups_helper(
2201            false,
2202            |path, selection, _row_filter, batch_size| {
2203                let runtime = tokio::runtime::Builder::new_current_thread()
2204                    .enable_all()
2205                    .build()
2206                    .expect("Could not create runtime");
2207                runtime.block_on(async move {
2208                    let file = tokio::fs::File::open(path).await.unwrap();
2209                    let row_number_field = Arc::new(
2210                        Field::new("row_number", DataType::Int64, false)
2211                            .with_extension_type(RowNumber),
2212                    );
2213                    let options = ArrowReaderOptions::new()
2214                        .with_virtual_columns(vec![row_number_field])
2215                        .unwrap();
2216                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2217                        .await
2218                        .unwrap()
2219                        .with_row_selection(selection)
2220                        .with_batch_size(batch_size)
2221                        .build()
2222                        .expect("Could not create reader");
2223                    reader.try_collect::<Vec<_>>().await.unwrap()
2224                })
2225            },
2226        );
2227    }
2228
2229    #[test]
2230    fn test_row_numbers_with_multiple_row_groups_and_filter() {
2231        test_row_numbers_with_multiple_row_groups_helper(
2232            true,
2233            |path, selection, row_filter, batch_size| {
2234                let runtime = tokio::runtime::Builder::new_current_thread()
2235                    .enable_all()
2236                    .build()
2237                    .expect("Could not create runtime");
2238                runtime.block_on(async move {
2239                    let file = tokio::fs::File::open(path).await.unwrap();
2240                    let row_number_field = Arc::new(
2241                        Field::new("row_number", DataType::Int64, false)
2242                            .with_extension_type(RowNumber),
2243                    );
2244                    let options = ArrowReaderOptions::new()
2245                        .with_virtual_columns(vec![row_number_field])
2246                        .unwrap();
2247                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2248                        .await
2249                        .unwrap()
2250                        .with_row_selection(selection)
2251                        .with_row_filter(row_filter.expect("No row filter"))
2252                        .with_batch_size(batch_size)
2253                        .build()
2254                        .expect("Could not create reader");
2255                    reader.try_collect::<Vec<_>>().await.unwrap()
2256                })
2257            },
2258        );
2259    }
2260
2261    #[tokio::test]
2262    async fn test_nested_lists() -> Result<()> {
2263        // Test case for https://github.com/apache/arrow-rs/issues/8657
2264        let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
2265        let table_schema = Arc::new(Schema::new(vec![
2266            Field::new("id", DataType::Int32, false),
2267            Field::new("vector", DataType::List(list_inner_field.clone()), true),
2268        ]));
2269
2270        let mut list_builder =
2271            ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
2272        list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
2273        list_builder.append(true);
2274        list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
2275        list_builder.append(true);
2276        list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
2277        list_builder.append(true);
2278        list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
2279        list_builder.append(true);
2280        let list_array = list_builder.finish();
2281
2282        let data = vec![RecordBatch::try_new(
2283            table_schema.clone(),
2284            vec![
2285                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
2286                Arc::new(list_array),
2287            ],
2288        )?];
2289
2290        let mut buffer = Vec::new();
2291        let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
2292
2293        for batch in data {
2294            writer.write(&batch).await?;
2295        }
2296
2297        writer.close().await?;
2298
2299        let reader = TestReader::new(Bytes::from(buffer));
2300        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
2301
2302        let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
2303            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2304        });
2305
2306        let projection_mask = ProjectionMask::all();
2307
2308        let mut stream = builder
2309            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
2310            .with_projection(projection_mask)
2311            .build()?;
2312
2313        while let Some(batch) = stream.next().await {
2314            let _ = batch.unwrap(); // ensure there is no panic
2315        }
2316
2317        Ok(())
2318    }
2319}