Skip to main content

parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
62///
63/// Notes:
64///
65/// 1. There is a default implementation for types that implement [`AsyncRead`]
66///    and [`AsyncSeek`], for example [`tokio::fs::File`].
67///
68/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
69///    is enabled, implements this interface for [`ObjectStore`].
70///
71/// [`ObjectStore`]: object_store::ObjectStore
72///
73/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
74pub trait AsyncFileReader: Send {
75    /// Retrieve the bytes in `range`
76    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
79    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80        async move {
81            let mut result = Vec::with_capacity(ranges.len());
82
83            for range in ranges.into_iter() {
84                let data = self.get_bytes(range).await?;
85                result.push(data);
86            }
87
88            Ok(result)
89        }
90        .boxed()
91    }
92
93    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
94    ///
95    /// This is an asynchronous operation as it may involve reading the file
96    /// footer and potentially other metadata from disk or a remote source.
97    ///
98    /// Reading data from Parquet requires the metadata to understand the
99    /// schema, row groups, and location of pages within the file. This metadata
100    /// is stored primarily in the footer of the Parquet file, and can be read using
101    /// [`ParquetMetaDataReader`].
102    ///
103    /// However, implementations can significantly speed up reading Parquet by
104    /// supplying cached metadata or pre-fetched metadata via this API.
105    ///
106    /// # Parameters
107    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
108    ///   and other options that affect how the metadata is read.
109    fn get_metadata<'a>(
110        &'a mut self,
111        options: Option<&'a ArrowReaderOptions>,
112    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
116impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118        self.as_mut().get_bytes(range)
119    }
120
121    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122        self.as_mut().get_byte_ranges(ranges)
123    }
124
125    fn get_metadata<'a>(
126        &'a mut self,
127        options: Option<&'a ArrowReaderOptions>,
128    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129        self.as_mut().get_metadata(options)
130    }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135        async move {
136            self.seek(SeekFrom::End(-(suffix as i64))).await?;
137            let mut buf = Vec::with_capacity(suffix);
138            self.take(suffix as _).read_to_end(&mut buf).await?;
139            Ok(buf.into())
140        }
141        .boxed()
142    }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147        async move {
148            self.seek(SeekFrom::Start(range.start)).await?;
149
150            let to_read = range.end - range.start;
151            let mut buffer = Vec::with_capacity(to_read.try_into()?);
152            let read = self.take(to_read).read_to_end(&mut buffer).await?;
153            if read as u64 != to_read {
154                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155            }
156
157            Ok(buffer.into())
158        }
159        .boxed()
160    }
161
162    fn get_metadata<'a>(
163        &'a mut self,
164        options: Option<&'a ArrowReaderOptions>,
165    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166        async move {
167            let metadata_opts = options.map(|o| o.metadata_options().clone());
168            let mut metadata_reader =
169                ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171            if let Some(opts) = options {
172                metadata_reader = metadata_reader
173                    .with_column_index_policy(opts.column_index_policy())
174                    .with_offset_index_policy(opts.offset_index_policy());
175            }
176
177            #[cfg(feature = "encryption")]
178            let metadata_reader = metadata_reader.with_decryption_properties(
179                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180            );
181
182            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183            Ok(Arc::new(parquet_metadata))
184        }
185        .boxed()
186    }
187}
188
189impl ArrowReaderMetadata {
190    /// Returns a new [`ArrowReaderMetadata`] for this builder
191    ///
192    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
193    pub async fn load_async<T: AsyncFileReader>(
194        input: &mut T,
195        options: ArrowReaderOptions,
196    ) -> Result<Self> {
197        let metadata = input.get_metadata(Some(&options)).await?;
198        Self::try_new(metadata, options)
199    }
200}
201
202#[doc(hidden)]
203/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync readers from async
204///
205/// Allows sharing the same builder for different readers while keeping the same
206/// ParquetRecordBatchStreamBuilder API
207pub struct AsyncReader<T>(T);
208
209/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
210///
211/// This can be used to decode a Parquet file in streaming fashion (without
212/// downloading the whole file at once) from a remote source, such as an object store.
213///
214/// This builder handles reading the parquet file metadata, allowing consumers
215/// to use this information to select what specific columns, row groups, etc.
216/// they wish to be read by the resulting stream.
217///
218/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
219///
220/// See [`ArrowReaderBuilder`] for additional member functions
221pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
225    /// specified source.
226    ///
227    /// # Example
228    /// ```
229    /// # #[tokio::main(flavor="current_thread")]
230    /// # async fn main() {
231    /// #
232    /// # use arrow_array::RecordBatch;
233    /// # use arrow::util::pretty::pretty_format_batches;
234    /// # use futures::TryStreamExt;
235    /// #
236    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
237    /// #
238    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
239    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
240    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
241    /// #     assert_eq!(
242    /// #          &actual_lines, expected_lines,
243    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
244    /// #          expected_lines, actual_lines
245    /// #      );
246    /// #  }
247    /// #
248    /// # let testdata = arrow::util::test_util::parquet_test_data();
249    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
250    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
251    /// // another async I/O reader such as a reader from an object store.
252    /// let file = tokio::fs::File::open(path).await.unwrap();
253    ///
254    /// // Configure options for reading from the async source
255    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
256    ///     .await
257    ///     .unwrap();
258    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
259    /// // a stream that can be used to incrementally read the data in batches
260    /// let stream = builder.build().unwrap();
261    /// // In this example, we collect the stream into a Vec<RecordBatch>
262    /// // but real applications would likely process the batches as they are read
263    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
264    /// // Demonstrate the results are as expected
265    /// assert_batches_eq(
266    ///     &results,
267    ///     &[
268    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
269    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
270    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
271    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
272    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
273    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
274    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
275    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
276    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
277    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
278    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
279    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
280    ///      ],
281    ///  );
282    /// # }
283    /// ```
284    ///
285    /// # Example configuring options and reading metadata
286    ///
287    /// There are many options that control the behavior of the reader, such as
288    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
289    ///
290    /// ```
291    /// # #[tokio::main(flavor="current_thread")]
292    /// # async fn main() {
293    /// #
294    /// # use arrow_array::RecordBatch;
295    /// # use arrow::util::pretty::pretty_format_batches;
296    /// # use futures::TryStreamExt;
297    /// #
298    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
299    /// #
300    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
301    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
302    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
303    /// #     assert_eq!(
304    /// #          &actual_lines, expected_lines,
305    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
306    /// #          expected_lines, actual_lines
307    /// #      );
308    /// #  }
309    /// #
310    /// # let testdata = arrow::util::test_util::parquet_test_data();
311    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
312    /// // As before, use tokio::fs::File to read data using an async I/O.
313    /// let file = tokio::fs::File::open(path).await.unwrap();
314    ///
315    /// // Configure options for reading from the async source, in this case we set the batch size
316    /// // to 3 which produces 3 rows at a time.
317    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
318    ///     .await
319    ///     .unwrap()
320    ///     .with_batch_size(3);
321    ///
322    /// // We can also read the metadata to inspect the schema and other metadata
323    /// // before actually reading the data
324    /// let file_metadata = builder.metadata().file_metadata();
325    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
326    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
327    ///
328    /// let stream = builder.with_projection(mask).build().unwrap();
329    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
330    /// // Print out the results
331    /// assert_batches_eq(
332    ///     &results,
333    ///     &[
334    ///         "+----------+-------------+-----------+",
335    ///         "| bool_col | tinyint_col | float_col |",
336    ///         "+----------+-------------+-----------+",
337    ///         "| true     | 0           | 0.0       |",
338    ///         "| false    | 1           | 1.1       |",
339    ///         "| true     | 0           | 0.0       |",
340    ///         "| false    | 1           | 1.1       |",
341    ///         "| true     | 0           | 0.0       |",
342    ///         "| false    | 1           | 1.1       |",
343    ///         "| true     | 0           | 0.0       |",
344    ///         "| false    | 1           | 1.1       |",
345    ///         "+----------+-------------+-----------+",
346    ///      ],
347    ///  );
348    ///
349    /// // The results has 8 rows, so since we set the batch size to 3, we expect
350    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
351    /// assert_eq!(results.len(), 3);
352    /// # }
353    /// ```
354    pub async fn new(input: T) -> Result<Self> {
355        Self::new_with_options(input, Default::default()).await
356    }
357
358    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
359    /// and [`ArrowReaderOptions`].
360    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362        Ok(Self::new_with_metadata(input, metadata))
363    }
364
365    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
366    ///
367    /// This allows loading metadata once and using it to create multiple builders with
368    /// potentially different settings, that can be read in parallel.
369    ///
370    /// # Example of reading from multiple streams in parallel
371    ///
372    /// ```
373    /// # use std::fs::metadata;
374    /// # use std::sync::Arc;
375    /// # use bytes::Bytes;
376    /// # use arrow_array::{Int32Array, RecordBatch};
377    /// # use arrow_schema::{DataType, Field, Schema};
378    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
379    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
380    /// # use tempfile::tempfile;
381    /// # use futures::StreamExt;
382    /// # #[tokio::main(flavor="current_thread")]
383    /// # async fn main() {
384    /// #
385    /// # let mut file = tempfile().unwrap();
386    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
387    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
388    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
389    /// # writer.write(&batch).unwrap();
390    /// # writer.close().unwrap();
391    /// // open file with parquet data
392    /// let mut file = tokio::fs::File::from_std(file);
393    /// // load metadata once
394    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
395    /// // create two readers, a and b, from the same underlying file
396    /// // without reading the metadata again
397    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
398    ///     file.try_clone().await.unwrap(),
399    ///     meta.clone()
400    /// ).build().unwrap();
401    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
402    ///
403    /// // Can read batches from both readers in parallel
404    /// assert_eq!(
405    ///   a.next().await.unwrap().unwrap(),
406    ///   b.next().await.unwrap().unwrap(),
407    /// );
408    /// # }
409    /// ```
410    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411        Self::new_builder(AsyncReader(input), metadata)
412    }
413
414    /// Read bloom filter for a column in a row group
415    ///
416    /// Returns `None` if the column does not have a bloom filter
417    ///
418    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
419    pub async fn get_row_group_column_bloom_filter(
420        &mut self,
421        row_group_idx: usize,
422        column_idx: usize,
423    ) -> Result<Option<Sbbf>> {
424        let metadata = self.metadata.row_group(row_group_idx);
425        let column_metadata = metadata.column(column_idx);
426
427        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428            offset
429                .try_into()
430                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431        } else {
432            return Ok(None);
433        };
434
435        let buffer = match column_metadata.bloom_filter_length() {
436            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437            None => self
438                .input
439                .0
440                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441        }
442        .await?;
443
444        let (header, bitset_offset) =
445            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447        match header.algorithm {
448            BloomFilterAlgorithm::BLOCK => {
449                // this match exists to future proof the singleton algorithm enum
450            }
451        }
452        match header.compression {
453            BloomFilterCompression::UNCOMPRESSED => {
454                // this match exists to future proof the singleton compression enum
455            }
456        }
457        match header.hash {
458            BloomFilterHash::XXHASH => {
459                // this match exists to future proof the singleton hash enum
460            }
461        }
462
463        let bitset = match column_metadata.bloom_filter_length() {
464            Some(_) => buffer.slice(
465                (TryInto::<usize>::try_into(bitset_offset).unwrap()
466                    - TryInto::<usize>::try_into(offset).unwrap())..,
467            ),
468            None => {
469                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470                    ParquetError::General("Bloom filter length is invalid".to_string())
471                })?;
472                self.input
473                    .0
474                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
475                    .await?
476            }
477        };
478        Ok(Some(Sbbf::new(&bitset)))
479    }
480
481    /// Build a new [`ParquetRecordBatchStream`]
482    ///
483    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
484    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485        let Self {
486            input,
487            metadata,
488            schema,
489            fields,
490            batch_size,
491            row_groups,
492            projection,
493            filter,
494            selection,
495            row_selection_policy: selection_strategy,
496            limit,
497            offset,
498            metrics,
499            max_predicate_cache_size,
500        } = self;
501
502        // Ensure schema of ParquetRecordBatchStream respects projection, and does
503        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
504        let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
505        let projected_fields = schema
506            .fields
507            .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
508        let projected_schema = Arc::new(Schema::new(projected_fields));
509
510        let decoder = ParquetPushDecoderBuilder {
511            input: NoInput,
512            metadata,
513            schema,
514            fields,
515            projection,
516            filter,
517            selection,
518            row_selection_policy: selection_strategy,
519            batch_size,
520            row_groups,
521            limit,
522            offset,
523            metrics,
524            max_predicate_cache_size,
525        }
526        .build()?;
527
528        let request_state = RequestState::None { input: input.0 };
529
530        Ok(ParquetRecordBatchStream {
531            schema: projected_schema,
532            decoder,
533            request_state,
534        })
535    }
536}
537
538/// State machine that tracks outstanding requests to fetch data
539///
540/// The parameter `T` is the input, typically an `AsyncFileReader`
541enum RequestState<T> {
542    /// No outstanding requests
543    None {
544        input: T,
545    },
546    /// There is an outstanding request for data
547    Outstanding {
548        /// Ranges that have been requested
549        ranges: Vec<Range<u64>>,
550        /// Future that will resolve (input, requested_ranges)
551        ///
552        /// Note the future owns the reader while the request is outstanding
553        /// and returns it upon completion
554        future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
555    },
556    Done,
557}
558
559impl<T> RequestState<T>
560where
561    T: AsyncFileReader + Unpin + Send + 'static,
562{
563    /// Issue a request to fetch `ranges`, returning the Outstanding state
564    fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
565        let ranges_captured = ranges.clone();
566
567        // Note this must move the input *into* the future
568        // because the get_byte_ranges future has a lifetime
569        // (aka can have references internally) and thus must
570        // own the input while the request is outstanding.
571        let future = async move {
572            let data = input.get_byte_ranges(ranges_captured).await?;
573            Ok((input, data))
574        }
575        .boxed();
576        RequestState::Outstanding { ranges, future }
577    }
578}
579
580impl<T> std::fmt::Debug for RequestState<T> {
581    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582        match self {
583            RequestState::None { input: _ } => f
584                .debug_struct("RequestState::None")
585                .field("input", &"...")
586                .finish(),
587            RequestState::Outstanding { ranges, .. } => f
588                .debug_struct("RequestState::Outstanding")
589                .field("ranges", &ranges)
590                .finish(),
591            RequestState::Done => {
592                write!(f, "RequestState::Done")
593            }
594        }
595    }
596}
597
598/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
599///
600/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
601/// allowing users to decode record batches separately from I/O.
602///
603/// # I/O Buffering
604///
605/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
606/// (projection + filtering, etc) and decodes the rows from those buffered pages.
607///
608/// For example, if all rows and columns are selected, the entire row group is
609/// buffered in memory during decode. This minimizes the number of IO operations
610/// required, which is especially important for object stores, where IO operations
611/// have latencies in the hundreds of milliseconds
612///
613/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
614/// buffering.
615///
616/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
617pub struct ParquetRecordBatchStream<T> {
618    /// Output schema of the stream
619    schema: SchemaRef,
620    /// Input and Outstanding IO request, if any
621    request_state: RequestState<T>,
622    /// Decoding state machine (no IO)
623    decoder: ParquetPushDecoder,
624}
625
626impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
627    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628        f.debug_struct("ParquetRecordBatchStream")
629            .field("request_state", &self.request_state)
630            .finish()
631    }
632}
633
634impl<T> ParquetRecordBatchStream<T> {
635    /// Returns the projected [`SchemaRef`] for reading the parquet file.
636    ///
637    /// Note that the schema metadata will be stripped here. See
638    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
639    pub fn schema(&self) -> &SchemaRef {
640        &self.schema
641    }
642}
643
644impl<T> ParquetRecordBatchStream<T>
645where
646    T: AsyncFileReader + Unpin + Send + 'static,
647{
648    /// Fetches the next row group from the stream.
649    ///
650    /// Users can continue to call this function to get row groups and decode them concurrently.
651    ///
652    /// ## Notes
653    ///
654    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
655    ///
656    /// ## Returns
657    ///
658    /// - `Ok(None)` if the stream has ended.
659    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
660    /// - `Ok(Some(reader))` which holds all the data for the row group.
661    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
662        loop {
663            // Take ownership of request state to process, leaving self in a
664            // valid state
665            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
666            match request_state {
667                // No outstanding requests, proceed to setup next row group
668                RequestState::None { input } => {
669                    match self.decoder.try_next_reader()? {
670                        DecodeResult::NeedsData(ranges) => {
671                            self.request_state = RequestState::begin_request(input, ranges);
672                            continue; // poll again (as the input might be ready immediately)
673                        }
674                        DecodeResult::Data(reader) => {
675                            self.request_state = RequestState::None { input };
676                            return Ok(Some(reader));
677                        }
678                        DecodeResult::Finished => return Ok(None),
679                    }
680                }
681                RequestState::Outstanding { ranges, future } => {
682                    let (input, data) = future.await?;
683                    // Push the requested data to the decoder and try again
684                    self.decoder.push_ranges(ranges, data)?;
685                    self.request_state = RequestState::None { input };
686                    continue; // try and decode on next iteration
687                }
688                RequestState::Done => {
689                    self.request_state = RequestState::Done;
690                    return Ok(None);
691                }
692            }
693        }
694    }
695}
696
697impl<T> Stream for ParquetRecordBatchStream<T>
698where
699    T: AsyncFileReader + Unpin + Send + 'static,
700{
701    type Item = Result<RecordBatch>;
702    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
703        match self.poll_next_inner(cx) {
704            Ok(res) => {
705                // Successfully decoded a batch, or reached end of stream.
706                // convert Option<RecordBatch> to Option<Result<RecordBatch>>
707                res.map(|res| Ok(res).transpose())
708            }
709            Err(e) => {
710                self.request_state = RequestState::Done;
711                Poll::Ready(Some(Err(e)))
712            }
713        }
714    }
715}
716
717impl<T> ParquetRecordBatchStream<T>
718where
719    T: AsyncFileReader + Unpin + Send + 'static,
720{
721    /// Inner state machine
722    ///
723    /// Note this is separate from poll_next so we can use ? operator to check for errors
724    /// as it returns `Result<Poll<Option<RecordBatch>>>`
725    fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
726        loop {
727            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
728            match request_state {
729                RequestState::None { input } => {
730                    // No outstanding requests, proceed to decode the next batch
731                    match self.decoder.try_decode()? {
732                        DecodeResult::NeedsData(ranges) => {
733                            self.request_state = RequestState::begin_request(input, ranges);
734                            continue; // poll again (as the input might be ready immediately)
735                        }
736                        DecodeResult::Data(batch) => {
737                            self.request_state = RequestState::None { input };
738                            return Ok(Poll::Ready(Some(batch)));
739                        }
740                        DecodeResult::Finished => {
741                            self.request_state = RequestState::Done;
742                            return Ok(Poll::Ready(None));
743                        }
744                    }
745                }
746                RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
747                    // Data was ready, push it to the decoder and continue
748                    Poll::Ready(result) => {
749                        let (input, data) = result?;
750                        // Push the requested data to the decoder
751                        self.decoder.push_ranges(ranges, data)?;
752                        self.request_state = RequestState::None { input };
753                        continue; // next iteration will try to decode the next batch
754                    }
755                    Poll::Pending => {
756                        self.request_state = RequestState::Outstanding { ranges, future };
757                        return Ok(Poll::Pending);
758                    }
759                },
760                RequestState::Done => {
761                    // Stream is done (error or end), return None
762                    self.request_state = RequestState::Done;
763                    return Ok(Poll::Ready(None));
764                }
765            }
766        }
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
774    use crate::arrow::arrow_reader::{
775        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
776    };
777    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
778    use crate::arrow::schema::virtual_type::RowNumber;
779    use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
780    use crate::file::metadata::PageIndexPolicy;
781    use crate::file::metadata::ParquetMetaDataReader;
782    use crate::file::properties::WriterProperties;
783    use arrow::compute::kernels::cmp::eq;
784    use arrow::error::Result as ArrowResult;
785    use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
786    use arrow_array::cast::AsArray;
787    use arrow_array::types::Int32Type;
788    use arrow_array::{
789        Array, ArrayRef, BooleanArray, Int32Array, RecordBatchReader, Scalar, StringArray,
790        StructArray, UInt64Array,
791    };
792    use arrow_schema::{DataType, Field, Schema};
793    use futures::{StreamExt, TryStreamExt};
794    use rand::{Rng, rng};
795    use std::collections::HashMap;
796    use std::sync::{Arc, Mutex};
797    use tempfile::tempfile;
798
799    #[derive(Clone)]
800    struct TestReader {
801        data: Bytes,
802        metadata: Option<Arc<ParquetMetaData>>,
803        requests: Arc<Mutex<Vec<Range<usize>>>>,
804    }
805
806    impl TestReader {
807        fn new(data: Bytes) -> Self {
808            Self {
809                data,
810                metadata: Default::default(),
811                requests: Default::default(),
812            }
813        }
814    }
815
816    impl AsyncFileReader for TestReader {
817        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
818            let range = range.clone();
819            self.requests
820                .lock()
821                .unwrap()
822                .push(range.start as usize..range.end as usize);
823            futures::future::ready(Ok(self
824                .data
825                .slice(range.start as usize..range.end as usize)))
826            .boxed()
827        }
828
829        fn get_metadata<'a>(
830            &'a mut self,
831            options: Option<&'a ArrowReaderOptions>,
832        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
833            let mut metadata_reader = ParquetMetaDataReader::new();
834            if let Some(opts) = options {
835                metadata_reader = metadata_reader
836                    .with_column_index_policy(opts.column_index_policy())
837                    .with_offset_index_policy(opts.offset_index_policy());
838            }
839            self.metadata = Some(Arc::new(
840                metadata_reader.parse_and_finish(&self.data).unwrap(),
841            ));
842            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
843        }
844    }
845
846    #[tokio::test]
847    async fn test_async_reader() {
848        let testdata = arrow::util::test_util::parquet_test_data();
849        let path = format!("{testdata}/alltypes_plain.parquet");
850        let data = Bytes::from(std::fs::read(path).unwrap());
851
852        let async_reader = TestReader::new(data.clone());
853
854        let requests = async_reader.requests.clone();
855        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
856            .await
857            .unwrap();
858
859        let metadata = builder.metadata().clone();
860        assert_eq!(metadata.num_row_groups(), 1);
861
862        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
863        let stream = builder
864            .with_projection(mask.clone())
865            .with_batch_size(1024)
866            .build()
867            .unwrap();
868
869        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
870
871        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
872            .unwrap()
873            .with_projection(mask)
874            .with_batch_size(104)
875            .build()
876            .unwrap()
877            .collect::<ArrowResult<Vec<_>>>()
878            .unwrap();
879
880        assert_eq!(async_batches, sync_batches);
881
882        let requests = requests.lock().unwrap();
883        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
884        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
885
886        assert_eq!(
887            &requests[..],
888            &[
889                offset_1 as usize..(offset_1 + length_1) as usize,
890                offset_2 as usize..(offset_2 + length_2) as usize
891            ]
892        );
893    }
894
895    #[tokio::test]
896    async fn test_async_reader_with_next_row_group() {
897        let testdata = arrow::util::test_util::parquet_test_data();
898        let path = format!("{testdata}/alltypes_plain.parquet");
899        let data = Bytes::from(std::fs::read(path).unwrap());
900
901        let async_reader = TestReader::new(data.clone());
902
903        let requests = async_reader.requests.clone();
904        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
905            .await
906            .unwrap();
907
908        let metadata = builder.metadata().clone();
909        assert_eq!(metadata.num_row_groups(), 1);
910
911        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
912        let mut stream = builder
913            .with_projection(mask.clone())
914            .with_batch_size(1024)
915            .build()
916            .unwrap();
917
918        let mut readers = vec![];
919        while let Some(reader) = stream.next_row_group().await.unwrap() {
920            readers.push(reader);
921        }
922
923        let async_batches: Vec<_> = readers
924            .into_iter()
925            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
926            .collect();
927
928        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
929            .unwrap()
930            .with_projection(mask)
931            .with_batch_size(104)
932            .build()
933            .unwrap()
934            .collect::<ArrowResult<Vec<_>>>()
935            .unwrap();
936
937        assert_eq!(async_batches, sync_batches);
938
939        let requests = requests.lock().unwrap();
940        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
941        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
942
943        assert_eq!(
944            &requests[..],
945            &[
946                offset_1 as usize..(offset_1 + length_1) as usize,
947                offset_2 as usize..(offset_2 + length_2) as usize
948            ]
949        );
950    }
951
952    #[tokio::test]
953    async fn test_async_reader_with_index() {
954        let testdata = arrow::util::test_util::parquet_test_data();
955        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
956        let data = Bytes::from(std::fs::read(path).unwrap());
957
958        let async_reader = TestReader::new(data.clone());
959
960        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
961        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
962            .await
963            .unwrap();
964
965        // The builder should have page and offset indexes loaded now
966        let metadata_with_index = builder.metadata();
967        assert_eq!(metadata_with_index.num_row_groups(), 1);
968
969        // Check offset indexes are present for all columns
970        let offset_index = metadata_with_index.offset_index().unwrap();
971        let column_index = metadata_with_index.column_index().unwrap();
972
973        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
974        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
975
976        let num_columns = metadata_with_index
977            .file_metadata()
978            .schema_descr()
979            .num_columns();
980
981        // Check page indexes are present for all columns
982        offset_index
983            .iter()
984            .for_each(|x| assert_eq!(x.len(), num_columns));
985        column_index
986            .iter()
987            .for_each(|x| assert_eq!(x.len(), num_columns));
988
989        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
990        let stream = builder
991            .with_projection(mask.clone())
992            .with_batch_size(1024)
993            .build()
994            .unwrap();
995
996        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
997
998        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
999            .unwrap()
1000            .with_projection(mask)
1001            .with_batch_size(1024)
1002            .build()
1003            .unwrap()
1004            .collect::<ArrowResult<Vec<_>>>()
1005            .unwrap();
1006
1007        assert_eq!(async_batches, sync_batches);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_async_reader_with_limit() {
1012        let testdata = arrow::util::test_util::parquet_test_data();
1013        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1014        let data = Bytes::from(std::fs::read(path).unwrap());
1015
1016        let metadata = ParquetMetaDataReader::new()
1017            .parse_and_finish(&data)
1018            .unwrap();
1019        let metadata = Arc::new(metadata);
1020
1021        assert_eq!(metadata.num_row_groups(), 1);
1022
1023        let async_reader = TestReader::new(data.clone());
1024
1025        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1026            .await
1027            .unwrap();
1028
1029        assert_eq!(builder.metadata().num_row_groups(), 1);
1030
1031        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1032        let stream = builder
1033            .with_projection(mask.clone())
1034            .with_batch_size(1024)
1035            .with_limit(1)
1036            .build()
1037            .unwrap();
1038
1039        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1040
1041        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1042            .unwrap()
1043            .with_projection(mask)
1044            .with_batch_size(1024)
1045            .with_limit(1)
1046            .build()
1047            .unwrap()
1048            .collect::<ArrowResult<Vec<_>>>()
1049            .unwrap();
1050
1051        assert_eq!(async_batches, sync_batches);
1052    }
1053
1054    #[tokio::test]
1055    async fn test_async_reader_skip_pages() {
1056        let testdata = arrow::util::test_util::parquet_test_data();
1057        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1058        let data = Bytes::from(std::fs::read(path).unwrap());
1059
1060        let async_reader = TestReader::new(data.clone());
1061
1062        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1063        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1064            .await
1065            .unwrap();
1066
1067        assert_eq!(builder.metadata().num_row_groups(), 1);
1068
1069        let selection = RowSelection::from(vec![
1070            RowSelector::skip(21),   // Skip first page
1071            RowSelector::select(21), // Select page to boundary
1072            RowSelector::skip(41),   // Skip multiple pages
1073            RowSelector::select(41), // Select multiple pages
1074            RowSelector::skip(25),   // Skip page across boundary
1075            RowSelector::select(25), // Select across page boundary
1076            RowSelector::skip(7116), // Skip to final page boundary
1077            RowSelector::select(10), // Select final page
1078        ]);
1079
1080        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1081
1082        let stream = builder
1083            .with_projection(mask.clone())
1084            .with_row_selection(selection.clone())
1085            .build()
1086            .expect("building stream");
1087
1088        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1089
1090        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1091            .unwrap()
1092            .with_projection(mask)
1093            .with_batch_size(1024)
1094            .with_row_selection(selection)
1095            .build()
1096            .unwrap()
1097            .collect::<ArrowResult<Vec<_>>>()
1098            .unwrap();
1099
1100        assert_eq!(async_batches, sync_batches);
1101    }
1102
1103    #[tokio::test]
1104    async fn test_fuzz_async_reader_selection() {
1105        let testdata = arrow::util::test_util::parquet_test_data();
1106        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1107        let data = Bytes::from(std::fs::read(path).unwrap());
1108
1109        let mut rand = rng();
1110
1111        for _ in 0..100 {
1112            let mut expected_rows = 0;
1113            let mut total_rows = 0;
1114            let mut skip = false;
1115            let mut selectors = vec![];
1116
1117            while total_rows < 7300 {
1118                let row_count: usize = rand.random_range(1..100);
1119
1120                let row_count = row_count.min(7300 - total_rows);
1121
1122                selectors.push(RowSelector { row_count, skip });
1123
1124                total_rows += row_count;
1125                if !skip {
1126                    expected_rows += row_count;
1127                }
1128
1129                skip = !skip;
1130            }
1131
1132            let selection = RowSelection::from(selectors);
1133
1134            let async_reader = TestReader::new(data.clone());
1135
1136            let options =
1137                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1138            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1139                .await
1140                .unwrap();
1141
1142            assert_eq!(builder.metadata().num_row_groups(), 1);
1143
1144            let col_idx: usize = rand.random_range(0..13);
1145            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1146
1147            let stream = builder
1148                .with_projection(mask.clone())
1149                .with_row_selection(selection.clone())
1150                .build()
1151                .expect("building stream");
1152
1153            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1154
1155            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1156
1157            assert_eq!(actual_rows, expected_rows);
1158        }
1159    }
1160
1161    #[tokio::test]
1162    async fn test_async_reader_zero_row_selector() {
1163        //See https://github.com/apache/arrow-rs/issues/2669
1164        let testdata = arrow::util::test_util::parquet_test_data();
1165        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1166        let data = Bytes::from(std::fs::read(path).unwrap());
1167
1168        let mut rand = rng();
1169
1170        let mut expected_rows = 0;
1171        let mut total_rows = 0;
1172        let mut skip = false;
1173        let mut selectors = vec![];
1174
1175        selectors.push(RowSelector {
1176            row_count: 0,
1177            skip: false,
1178        });
1179
1180        while total_rows < 7300 {
1181            let row_count: usize = rand.random_range(1..100);
1182
1183            let row_count = row_count.min(7300 - total_rows);
1184
1185            selectors.push(RowSelector { row_count, skip });
1186
1187            total_rows += row_count;
1188            if !skip {
1189                expected_rows += row_count;
1190            }
1191
1192            skip = !skip;
1193        }
1194
1195        let selection = RowSelection::from(selectors);
1196
1197        let async_reader = TestReader::new(data.clone());
1198
1199        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1200        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1201            .await
1202            .unwrap();
1203
1204        assert_eq!(builder.metadata().num_row_groups(), 1);
1205
1206        let col_idx: usize = rand.random_range(0..13);
1207        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1208
1209        let stream = builder
1210            .with_projection(mask.clone())
1211            .with_row_selection(selection.clone())
1212            .build()
1213            .expect("building stream");
1214
1215        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1216
1217        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1218
1219        assert_eq!(actual_rows, expected_rows);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_limit_multiple_row_groups() {
1224        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1225        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1226        let c = Int32Array::from_iter(0..6);
1227        let data = RecordBatch::try_from_iter([
1228            ("a", Arc::new(a) as ArrayRef),
1229            ("b", Arc::new(b) as ArrayRef),
1230            ("c", Arc::new(c) as ArrayRef),
1231        ])
1232        .unwrap();
1233
1234        let mut buf = Vec::with_capacity(1024);
1235        let props = WriterProperties::builder()
1236            .set_max_row_group_size(3)
1237            .build();
1238        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1239        writer.write(&data).unwrap();
1240        writer.close().unwrap();
1241
1242        let data: Bytes = buf.into();
1243        let metadata = ParquetMetaDataReader::new()
1244            .parse_and_finish(&data)
1245            .unwrap();
1246
1247        assert_eq!(metadata.num_row_groups(), 2);
1248
1249        let test = TestReader::new(data);
1250
1251        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1252            .await
1253            .unwrap()
1254            .with_batch_size(1024)
1255            .with_limit(4)
1256            .build()
1257            .unwrap();
1258
1259        let batches: Vec<_> = stream.try_collect().await.unwrap();
1260        // Expect one batch for each row group
1261        assert_eq!(batches.len(), 2);
1262
1263        let batch = &batches[0];
1264        // First batch should contain all rows
1265        assert_eq!(batch.num_rows(), 3);
1266        assert_eq!(batch.num_columns(), 3);
1267        let col2 = batch.column(2).as_primitive::<Int32Type>();
1268        assert_eq!(col2.values(), &[0, 1, 2]);
1269
1270        let batch = &batches[1];
1271        // Second batch should trigger the limit and only have one row
1272        assert_eq!(batch.num_rows(), 1);
1273        assert_eq!(batch.num_columns(), 3);
1274        let col2 = batch.column(2).as_primitive::<Int32Type>();
1275        assert_eq!(col2.values(), &[3]);
1276
1277        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1278            .await
1279            .unwrap()
1280            .with_offset(2)
1281            .with_limit(3)
1282            .build()
1283            .unwrap();
1284
1285        let batches: Vec<_> = stream.try_collect().await.unwrap();
1286        // Expect one batch for each row group
1287        assert_eq!(batches.len(), 2);
1288
1289        let batch = &batches[0];
1290        // First batch should contain one row
1291        assert_eq!(batch.num_rows(), 1);
1292        assert_eq!(batch.num_columns(), 3);
1293        let col2 = batch.column(2).as_primitive::<Int32Type>();
1294        assert_eq!(col2.values(), &[2]);
1295
1296        let batch = &batches[1];
1297        // Second batch should contain two rows
1298        assert_eq!(batch.num_rows(), 2);
1299        assert_eq!(batch.num_columns(), 3);
1300        let col2 = batch.column(2).as_primitive::<Int32Type>();
1301        assert_eq!(col2.values(), &[3, 4]);
1302
1303        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1304            .await
1305            .unwrap()
1306            .with_offset(4)
1307            .with_limit(20)
1308            .build()
1309            .unwrap();
1310
1311        let batches: Vec<_> = stream.try_collect().await.unwrap();
1312        // Should skip first row group
1313        assert_eq!(batches.len(), 1);
1314
1315        let batch = &batches[0];
1316        // First batch should contain two rows
1317        assert_eq!(batch.num_rows(), 2);
1318        assert_eq!(batch.num_columns(), 3);
1319        let col2 = batch.column(2).as_primitive::<Int32Type>();
1320        assert_eq!(col2.values(), &[4, 5]);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_batch_size_overallocate() {
1325        let testdata = arrow::util::test_util::parquet_test_data();
1326        // `alltypes_plain.parquet` only have 8 rows
1327        let path = format!("{testdata}/alltypes_plain.parquet");
1328        let data = Bytes::from(std::fs::read(path).unwrap());
1329
1330        let async_reader = TestReader::new(data.clone());
1331
1332        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1333            .await
1334            .unwrap();
1335
1336        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1337
1338        let builder = builder
1339            .with_projection(ProjectionMask::all())
1340            .with_batch_size(1024);
1341
1342        // even though the batch size is set to 1024, it should adjust to the max
1343        // number of rows in the file (8)
1344        assert_ne!(1024, file_rows);
1345        assert_eq!(builder.batch_size, file_rows);
1346
1347        let _stream = builder.build().unwrap();
1348    }
1349
1350    #[tokio::test]
1351    async fn test_parquet_record_batch_stream_schema() {
1352        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1353            schema.flattened_fields().iter().map(|f| f.name()).collect()
1354        }
1355
1356        // ParquetRecordBatchReaderBuilder::schema differs from
1357        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1358        // schema contents (in terms of custom metadata attached to schema, and fields
1359        // returned). Test to ensure this remains consistent behaviour.
1360        //
1361        // Ensure same for asynchronous versions of the above.
1362
1363        // Prep data, for a schema with nested fields, with custom metadata
1364        let mut metadata = HashMap::with_capacity(1);
1365        metadata.insert("key".to_string(), "value".to_string());
1366
1367        let nested_struct_array = StructArray::from(vec![
1368            (
1369                Arc::new(Field::new("d", DataType::Utf8, true)),
1370                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1371            ),
1372            (
1373                Arc::new(Field::new("e", DataType::Utf8, true)),
1374                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1375            ),
1376        ]);
1377        let struct_array = StructArray::from(vec![
1378            (
1379                Arc::new(Field::new("a", DataType::Int32, true)),
1380                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1381            ),
1382            (
1383                Arc::new(Field::new("b", DataType::UInt64, true)),
1384                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1385            ),
1386            (
1387                Arc::new(Field::new(
1388                    "c",
1389                    nested_struct_array.data_type().clone(),
1390                    true,
1391                )),
1392                Arc::new(nested_struct_array) as ArrayRef,
1393            ),
1394        ]);
1395
1396        let schema =
1397            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1398        let record_batch = RecordBatch::from(struct_array)
1399            .with_schema(schema.clone())
1400            .unwrap();
1401
1402        // Write parquet with custom metadata in schema
1403        let mut file = tempfile().unwrap();
1404        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1405        writer.write(&record_batch).unwrap();
1406        writer.close().unwrap();
1407
1408        let all_fields = ["a", "b", "c", "d", "e"];
1409        // (leaf indices in mask, expected names in output schema all fields)
1410        let projections = [
1411            (vec![], vec![]),
1412            (vec![0], vec!["a"]),
1413            (vec![0, 1], vec!["a", "b"]),
1414            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1415            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1416        ];
1417
1418        // Ensure we're consistent for each of these projections
1419        for (indices, expected_projected_names) in projections {
1420            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1421                // Builder schema should preserve all fields and metadata
1422                assert_eq!(get_all_field_names(&builder), all_fields);
1423                assert_eq!(builder.metadata, metadata);
1424                // Reader & batch schema should show only projected fields, and no metadata
1425                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1426                assert_eq!(reader.metadata, HashMap::default());
1427                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1428                assert_eq!(batch.metadata, HashMap::default());
1429            };
1430
1431            let builder =
1432                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1433            let sync_builder_schema = builder.schema().clone();
1434            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1435            let mut reader = builder.with_projection(mask).build().unwrap();
1436            let sync_reader_schema = reader.schema();
1437            let batch = reader.next().unwrap().unwrap();
1438            let sync_batch_schema = batch.schema();
1439            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1440
1441            // asynchronous should be same
1442            let file = tokio::fs::File::from(file.try_clone().unwrap());
1443            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1444            let async_builder_schema = builder.schema().clone();
1445            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1446            let mut reader = builder.with_projection(mask).build().unwrap();
1447            let async_reader_schema = reader.schema().clone();
1448            let batch = reader.next().await.unwrap().unwrap();
1449            let async_batch_schema = batch.schema();
1450            assert_schemas(
1451                async_builder_schema,
1452                async_reader_schema,
1453                async_batch_schema,
1454            );
1455        }
1456    }
1457
1458    #[tokio::test]
1459    async fn test_nested_skip() {
1460        let schema = Arc::new(Schema::new(vec![
1461            Field::new("col_1", DataType::UInt64, false),
1462            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1463        ]));
1464
1465        // Default writer properties
1466        let props = WriterProperties::builder()
1467            .set_data_page_row_count_limit(256)
1468            .set_write_batch_size(256)
1469            .set_max_row_group_size(1024);
1470
1471        // Write data
1472        let mut file = tempfile().unwrap();
1473        let mut writer =
1474            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1475
1476        let mut builder = ListBuilder::new(StringBuilder::new());
1477        for id in 0..1024 {
1478            match id % 3 {
1479                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1480                1 => builder.append_value([Some(format!("id_{id}"))]),
1481                _ => builder.append_null(),
1482            }
1483        }
1484        let refs = vec![
1485            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1486            Arc::new(builder.finish()) as ArrayRef,
1487        ];
1488
1489        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1490        writer.write(&batch).unwrap();
1491        writer.close().unwrap();
1492
1493        let selections = [
1494            RowSelection::from(vec![
1495                RowSelector::skip(313),
1496                RowSelector::select(1),
1497                RowSelector::skip(709),
1498                RowSelector::select(1),
1499            ]),
1500            RowSelection::from(vec![
1501                RowSelector::skip(255),
1502                RowSelector::select(1),
1503                RowSelector::skip(767),
1504                RowSelector::select(1),
1505            ]),
1506            RowSelection::from(vec![
1507                RowSelector::select(255),
1508                RowSelector::skip(1),
1509                RowSelector::select(767),
1510                RowSelector::skip(1),
1511            ]),
1512            RowSelection::from(vec![
1513                RowSelector::skip(254),
1514                RowSelector::select(1),
1515                RowSelector::select(1),
1516                RowSelector::skip(767),
1517                RowSelector::select(1),
1518            ]),
1519        ];
1520
1521        for selection in selections {
1522            let expected = selection.row_count();
1523            // Read data
1524            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1525                tokio::fs::File::from_std(file.try_clone().unwrap()),
1526                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1527            )
1528            .await
1529            .unwrap();
1530
1531            reader = reader.with_row_selection(selection);
1532
1533            let mut stream = reader.build().unwrap();
1534
1535            let mut total_rows = 0;
1536            while let Some(rb) = stream.next().await {
1537                let rb = rb.unwrap();
1538                total_rows += rb.num_rows();
1539            }
1540            assert_eq!(total_rows, expected);
1541        }
1542    }
1543
1544    #[tokio::test]
1545    #[allow(deprecated)]
1546    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1547        use tokio::fs::File;
1548        let testdata = arrow::util::test_util::parquet_test_data();
1549        let path = format!("{testdata}/alltypes_plain.parquet");
1550        let mut file = File::open(&path).await.unwrap();
1551        let file_size = file.metadata().await.unwrap().len();
1552        let mut metadata = ParquetMetaDataReader::new()
1553            .with_page_indexes(true)
1554            .load_and_finish(&mut file, file_size)
1555            .await
1556            .unwrap();
1557
1558        metadata.set_offset_index(Some(vec![]));
1559        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1560        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1561        let reader =
1562            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1563                .build()
1564                .unwrap();
1565
1566        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1567        assert_eq!(result.len(), 1);
1568    }
1569
1570    #[tokio::test]
1571    #[allow(deprecated)]
1572    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1573        use tokio::fs::File;
1574        let testdata = arrow::util::test_util::parquet_test_data();
1575        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1576        let mut file = File::open(&path).await.unwrap();
1577        let file_size = file.metadata().await.unwrap().len();
1578        let metadata = ParquetMetaDataReader::new()
1579            .with_page_indexes(true)
1580            .load_and_finish(&mut file, file_size)
1581            .await
1582            .unwrap();
1583
1584        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1585        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1586        let reader =
1587            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1588                .build()
1589                .unwrap();
1590
1591        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1592        assert_eq!(result.len(), 8);
1593    }
1594
1595    #[tokio::test]
1596    #[allow(deprecated)]
1597    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1598        use tempfile::TempDir;
1599        use tokio::fs::File;
1600        fn write_metadata_to_local_file(
1601            metadata: ParquetMetaData,
1602            file: impl AsRef<std::path::Path>,
1603        ) {
1604            use crate::file::metadata::ParquetMetaDataWriter;
1605            use std::fs::File;
1606            let file = File::create(file).unwrap();
1607            ParquetMetaDataWriter::new(file, &metadata)
1608                .finish()
1609                .unwrap()
1610        }
1611
1612        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
1613            use std::fs::File;
1614            let file = File::open(file).unwrap();
1615            ParquetMetaDataReader::new()
1616                .with_page_indexes(true)
1617                .parse_and_finish(&file)
1618                .unwrap()
1619        }
1620
1621        let testdata = arrow::util::test_util::parquet_test_data();
1622        let path = format!("{testdata}/alltypes_plain.parquet");
1623        let mut file = File::open(&path).await.unwrap();
1624        let file_size = file.metadata().await.unwrap().len();
1625        let metadata = ParquetMetaDataReader::new()
1626            .with_page_indexes(true)
1627            .load_and_finish(&mut file, file_size)
1628            .await
1629            .unwrap();
1630
1631        let tempdir = TempDir::new().unwrap();
1632        let metadata_path = tempdir.path().join("thrift_metadata.dat");
1633        write_metadata_to_local_file(metadata, &metadata_path);
1634        let metadata = read_metadata_from_local_file(&metadata_path);
1635
1636        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1637        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1638        let reader =
1639            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1640                .build()
1641                .unwrap();
1642
1643        // Panics here
1644        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1645        assert_eq!(result.len(), 1);
1646    }
1647
1648    #[tokio::test]
1649    async fn test_cached_array_reader_sparse_offset_error() {
1650        use futures::TryStreamExt;
1651
1652        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
1653        use arrow_array::{BooleanArray, RecordBatch};
1654
1655        let testdata = arrow::util::test_util::parquet_test_data();
1656        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1657        let data = Bytes::from(std::fs::read(path).unwrap());
1658
1659        let async_reader = TestReader::new(data);
1660
1661        // Enable page index so the fetch logic loads only required pages
1662        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1663        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1664            .await
1665            .unwrap();
1666
1667        // Skip the first 22 rows (entire first Parquet page) and then select the
1668        // next 3 rows (22, 23, 24). This means the fetch step will not include
1669        // the first page starting at file offset 0.
1670        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
1671
1672        // Trivial predicate on column 0 that always returns `true`. Using the
1673        // same column in both predicate and projection activates the caching
1674        // layer (Producer/Consumer pattern).
1675        let parquet_schema = builder.parquet_schema();
1676        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
1677        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
1678            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1679        });
1680        let filter = RowFilter::new(vec![Box::new(always_true)]);
1681
1682        // Build the stream with batch size 8 so the cache reads whole batches
1683        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
1684        let stream = builder
1685            .with_batch_size(8)
1686            .with_projection(proj)
1687            .with_row_selection(selection)
1688            .with_row_filter(filter)
1689            .build()
1690            .unwrap();
1691
1692        // Collecting the stream should fail with the sparse column chunk offset
1693        // error we want to reproduce.
1694        let _result: Vec<_> = stream.try_collect().await.unwrap();
1695    }
1696
1697    #[tokio::test]
1698    async fn test_predicate_cache_disabled() {
1699        let k = Int32Array::from_iter_values(0..10);
1700        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
1701
1702        let mut buf = Vec::new();
1703        // both the page row limit and batch size are set to 1 to create one page per row
1704        let props = WriterProperties::builder()
1705            .set_data_page_row_count_limit(1)
1706            .set_write_batch_size(1)
1707            .set_max_row_group_size(10)
1708            .set_write_page_header_statistics(true)
1709            .build();
1710        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1711        writer.write(&data).unwrap();
1712        writer.close().unwrap();
1713
1714        let data = Bytes::from(buf);
1715        let metadata = ParquetMetaDataReader::new()
1716            .with_page_index_policy(PageIndexPolicy::Required)
1717            .parse_and_finish(&data)
1718            .unwrap();
1719        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1720
1721        // the filter is not clone-able, so we use a lambda to simplify
1722        let build_filter = || {
1723            let scalar = Int32Array::from_iter_values([5]);
1724            let predicate = ArrowPredicateFn::new(
1725                ProjectionMask::leaves(&parquet_schema, vec![0]),
1726                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
1727            );
1728            RowFilter::new(vec![Box::new(predicate)])
1729        };
1730
1731        // select only one of the pages
1732        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
1733
1734        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1735        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1736
1737        // using the predicate cache (default)
1738        let reader_with_cache = TestReader::new(data.clone());
1739        let requests_with_cache = reader_with_cache.requests.clone();
1740        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1741            reader_with_cache,
1742            reader_metadata.clone(),
1743        )
1744        .with_batch_size(1000)
1745        .with_row_selection(selection.clone())
1746        .with_row_filter(build_filter())
1747        .build()
1748        .unwrap();
1749        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
1750
1751        // disabling the predicate cache
1752        let reader_without_cache = TestReader::new(data);
1753        let requests_without_cache = reader_without_cache.requests.clone();
1754        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1755            reader_without_cache,
1756            reader_metadata,
1757        )
1758        .with_batch_size(1000)
1759        .with_row_selection(selection)
1760        .with_row_filter(build_filter())
1761        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
1762        .build()
1763        .unwrap();
1764        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
1765
1766        assert_eq!(batches_with_cache, batches_without_cache);
1767
1768        let requests_with_cache = requests_with_cache.lock().unwrap();
1769        let requests_without_cache = requests_without_cache.lock().unwrap();
1770
1771        // less requests will be made without the predicate cache
1772        assert_eq!(requests_with_cache.len(), 11);
1773        assert_eq!(requests_without_cache.len(), 2);
1774
1775        // less bytes will be retrieved without the predicate cache
1776        assert_eq!(
1777            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
1778            433
1779        );
1780        assert_eq!(
1781            requests_without_cache
1782                .iter()
1783                .map(|r| r.len())
1784                .sum::<usize>(),
1785            92
1786        );
1787    }
1788
1789    #[test]
1790    fn test_row_numbers_with_multiple_row_groups() {
1791        test_row_numbers_with_multiple_row_groups_helper(
1792            false,
1793            |path, selection, _row_filter, batch_size| {
1794                let runtime = tokio::runtime::Builder::new_current_thread()
1795                    .enable_all()
1796                    .build()
1797                    .expect("Could not create runtime");
1798                runtime.block_on(async move {
1799                    let file = tokio::fs::File::open(path).await.unwrap();
1800                    let row_number_field = Arc::new(
1801                        Field::new("row_number", DataType::Int64, false)
1802                            .with_extension_type(RowNumber),
1803                    );
1804                    let options = ArrowReaderOptions::new()
1805                        .with_virtual_columns(vec![row_number_field])
1806                        .unwrap();
1807                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1808                        .await
1809                        .unwrap()
1810                        .with_row_selection(selection)
1811                        .with_batch_size(batch_size)
1812                        .build()
1813                        .expect("Could not create reader");
1814                    reader.try_collect::<Vec<_>>().await.unwrap()
1815                })
1816            },
1817        );
1818    }
1819
1820    #[test]
1821    fn test_row_numbers_with_multiple_row_groups_and_filter() {
1822        test_row_numbers_with_multiple_row_groups_helper(
1823            true,
1824            |path, selection, row_filter, batch_size| {
1825                let runtime = tokio::runtime::Builder::new_current_thread()
1826                    .enable_all()
1827                    .build()
1828                    .expect("Could not create runtime");
1829                runtime.block_on(async move {
1830                    let file = tokio::fs::File::open(path).await.unwrap();
1831                    let row_number_field = Arc::new(
1832                        Field::new("row_number", DataType::Int64, false)
1833                            .with_extension_type(RowNumber),
1834                    );
1835                    let options = ArrowReaderOptions::new()
1836                        .with_virtual_columns(vec![row_number_field])
1837                        .unwrap();
1838                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1839                        .await
1840                        .unwrap()
1841                        .with_row_selection(selection)
1842                        .with_row_filter(row_filter.expect("No row filter"))
1843                        .with_batch_size(batch_size)
1844                        .build()
1845                        .expect("Could not create reader");
1846                    reader.try_collect::<Vec<_>>().await.unwrap()
1847                })
1848            },
1849        );
1850    }
1851
1852    #[tokio::test]
1853    async fn test_nested_lists() -> Result<()> {
1854        // Test case for https://github.com/apache/arrow-rs/issues/8657
1855        let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
1856        let table_schema = Arc::new(Schema::new(vec![
1857            Field::new("id", DataType::Int32, false),
1858            Field::new("vector", DataType::List(list_inner_field.clone()), true),
1859        ]));
1860
1861        let mut list_builder =
1862            ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
1863        list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
1864        list_builder.append(true);
1865        list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
1866        list_builder.append(true);
1867        list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
1868        list_builder.append(true);
1869        list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
1870        list_builder.append(true);
1871        let list_array = list_builder.finish();
1872
1873        let data = vec![RecordBatch::try_new(
1874            table_schema.clone(),
1875            vec![
1876                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
1877                Arc::new(list_array),
1878            ],
1879        )?];
1880
1881        let mut buffer = Vec::new();
1882        let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
1883
1884        for batch in data {
1885            writer.write(&batch).await?;
1886        }
1887
1888        writer.close().await?;
1889
1890        let reader = TestReader::new(Bytes::from(buffer));
1891        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
1892
1893        let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
1894            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1895        });
1896
1897        let projection_mask = ProjectionMask::all();
1898
1899        let mut stream = builder
1900            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1901            .with_projection(projection_mask)
1902            .build()?;
1903
1904        while let Some(batch) = stream.next().await {
1905            let _ = batch.unwrap(); // ensure there is no panic
1906        }
1907
1908        Ok(())
1909    }
1910}