parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
62///
63/// Notes:
64///
65/// 1. There is a default implementation for types that implement [`AsyncRead`]
66///    and [`AsyncSeek`], for example [`tokio::fs::File`].
67///
68/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
69///    is enabled, implements this interface for [`ObjectStore`].
70///
71/// [`ObjectStore`]: object_store::ObjectStore
72///
73/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
74pub trait AsyncFileReader: Send {
75    /// Retrieve the bytes in `range`
76    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
79    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80        async move {
81            let mut result = Vec::with_capacity(ranges.len());
82
83            for range in ranges.into_iter() {
84                let data = self.get_bytes(range).await?;
85                result.push(data);
86            }
87
88            Ok(result)
89        }
90        .boxed()
91    }
92
93    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
94    ///
95    /// This is an asynchronous operation as it may involve reading the file
96    /// footer and potentially other metadata from disk or a remote source.
97    ///
98    /// Reading data from Parquet requires the metadata to understand the
99    /// schema, row groups, and location of pages within the file. This metadata
100    /// is stored primarily in the footer of the Parquet file, and can be read using
101    /// [`ParquetMetaDataReader`].
102    ///
103    /// However, implementations can significantly speed up reading Parquet by
104    /// supplying cached metadata or pre-fetched metadata via this API.
105    ///
106    /// # Parameters
107    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
108    ///   and other options that affect how the metadata is read.
109    fn get_metadata<'a>(
110        &'a mut self,
111        options: Option<&'a ArrowReaderOptions>,
112    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
116impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118        self.as_mut().get_bytes(range)
119    }
120
121    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122        self.as_mut().get_byte_ranges(ranges)
123    }
124
125    fn get_metadata<'a>(
126        &'a mut self,
127        options: Option<&'a ArrowReaderOptions>,
128    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129        self.as_mut().get_metadata(options)
130    }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135        async move {
136            self.seek(SeekFrom::End(-(suffix as i64))).await?;
137            let mut buf = Vec::with_capacity(suffix);
138            self.take(suffix as _).read_to_end(&mut buf).await?;
139            Ok(buf.into())
140        }
141        .boxed()
142    }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147        async move {
148            self.seek(SeekFrom::Start(range.start)).await?;
149
150            let to_read = range.end - range.start;
151            let mut buffer = Vec::with_capacity(to_read.try_into()?);
152            let read = self.take(to_read).read_to_end(&mut buffer).await?;
153            if read as u64 != to_read {
154                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155            }
156
157            Ok(buffer.into())
158        }
159        .boxed()
160    }
161
162    fn get_metadata<'a>(
163        &'a mut self,
164        options: Option<&'a ArrowReaderOptions>,
165    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166        async move {
167            let metadata_opts = options.map(|o| o.metadata_options().clone());
168            let metadata_reader = ParquetMetaDataReader::new()
169                .with_page_index_policy(PageIndexPolicy::from(
170                    options.is_some_and(|o| o.page_index()),
171                ))
172                .with_metadata_options(metadata_opts);
173
174            #[cfg(feature = "encryption")]
175            let metadata_reader = metadata_reader.with_decryption_properties(
176                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
177            );
178
179            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
180            Ok(Arc::new(parquet_metadata))
181        }
182        .boxed()
183    }
184}
185
186impl ArrowReaderMetadata {
187    /// Returns a new [`ArrowReaderMetadata`] for this builder
188    ///
189    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
190    pub async fn load_async<T: AsyncFileReader>(
191        input: &mut T,
192        options: ArrowReaderOptions,
193    ) -> Result<Self> {
194        let metadata = input.get_metadata(Some(&options)).await?;
195        Self::try_new(metadata, options)
196    }
197}
198
199#[doc(hidden)]
200/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync readers from async
201///
202/// Allows sharing the same builder for different readers while keeping the same
203/// ParquetRecordBatchStreamBuilder API
204pub struct AsyncReader<T>(T);
205
206/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
207///
208/// This can be used to decode a Parquet file in streaming fashion (without
209/// downloading the whole file at once) from a remote source, such as an object store.
210///
211/// This builder handles reading the parquet file metadata, allowing consumers
212/// to use this information to select what specific columns, row groups, etc.
213/// they wish to be read by the resulting stream.
214///
215/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
216///
217/// See [`ArrowReaderBuilder`] for additional member functions
218pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
219
220impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
221    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
222    /// specified source.
223    ///
224    /// # Example
225    /// ```
226    /// # #[tokio::main(flavor="current_thread")]
227    /// # async fn main() {
228    /// #
229    /// # use arrow_array::RecordBatch;
230    /// # use arrow::util::pretty::pretty_format_batches;
231    /// # use futures::TryStreamExt;
232    /// #
233    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
234    /// #
235    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
236    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
237    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
238    /// #     assert_eq!(
239    /// #          &actual_lines, expected_lines,
240    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
241    /// #          expected_lines, actual_lines
242    /// #      );
243    /// #  }
244    /// #
245    /// # let testdata = arrow::util::test_util::parquet_test_data();
246    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
247    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
248    /// // another async I/O reader such as a reader from an object store.
249    /// let file = tokio::fs::File::open(path).await.unwrap();
250    ///
251    /// // Configure options for reading from the async source
252    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
253    ///     .await
254    ///     .unwrap();
255    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
256    /// // a stream that can be used to incrementally read the data in batches
257    /// let stream = builder.build().unwrap();
258    /// // In this example, we collect the stream into a Vec<RecordBatch>
259    /// // but real applications would likely process the batches as they are read
260    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
261    /// // Demonstrate the results are as expected
262    /// assert_batches_eq(
263    ///     &results,
264    ///     &[
265    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
266    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
267    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
268    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
269    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
270    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
271    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
272    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
273    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
274    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
275    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
276    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
277    ///      ],
278    ///  );
279    /// # }
280    /// ```
281    ///
282    /// # Example configuring options and reading metadata
283    ///
284    /// There are many options that control the behavior of the reader, such as
285    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
286    ///
287    /// ```
288    /// # #[tokio::main(flavor="current_thread")]
289    /// # async fn main() {
290    /// #
291    /// # use arrow_array::RecordBatch;
292    /// # use arrow::util::pretty::pretty_format_batches;
293    /// # use futures::TryStreamExt;
294    /// #
295    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
296    /// #
297    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
298    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
299    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
300    /// #     assert_eq!(
301    /// #          &actual_lines, expected_lines,
302    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
303    /// #          expected_lines, actual_lines
304    /// #      );
305    /// #  }
306    /// #
307    /// # let testdata = arrow::util::test_util::parquet_test_data();
308    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
309    /// // As before, use tokio::fs::File to read data using an async I/O.
310    /// let file = tokio::fs::File::open(path).await.unwrap();
311    ///
312    /// // Configure options for reading from the async source, in this case we set the batch size
313    /// // to 3 which produces 3 rows at a time.
314    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
315    ///     .await
316    ///     .unwrap()
317    ///     .with_batch_size(3);
318    ///
319    /// // We can also read the metadata to inspect the schema and other metadata
320    /// // before actually reading the data
321    /// let file_metadata = builder.metadata().file_metadata();
322    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
323    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
324    ///
325    /// let stream = builder.with_projection(mask).build().unwrap();
326    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
327    /// // Print out the results
328    /// assert_batches_eq(
329    ///     &results,
330    ///     &[
331    ///         "+----------+-------------+-----------+",
332    ///         "| bool_col | tinyint_col | float_col |",
333    ///         "+----------+-------------+-----------+",
334    ///         "| true     | 0           | 0.0       |",
335    ///         "| false    | 1           | 1.1       |",
336    ///         "| true     | 0           | 0.0       |",
337    ///         "| false    | 1           | 1.1       |",
338    ///         "| true     | 0           | 0.0       |",
339    ///         "| false    | 1           | 1.1       |",
340    ///         "| true     | 0           | 0.0       |",
341    ///         "| false    | 1           | 1.1       |",
342    ///         "+----------+-------------+-----------+",
343    ///      ],
344    ///  );
345    ///
346    /// // The results has 8 rows, so since we set the batch size to 3, we expect
347    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
348    /// assert_eq!(results.len(), 3);
349    /// # }
350    /// ```
351    pub async fn new(input: T) -> Result<Self> {
352        Self::new_with_options(input, Default::default()).await
353    }
354
355    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
356    /// and [`ArrowReaderOptions`].
357    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
358        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
359        Ok(Self::new_with_metadata(input, metadata))
360    }
361
362    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
363    ///
364    /// This allows loading metadata once and using it to create multiple builders with
365    /// potentially different settings, that can be read in parallel.
366    ///
367    /// # Example of reading from multiple streams in parallel
368    ///
369    /// ```
370    /// # use std::fs::metadata;
371    /// # use std::sync::Arc;
372    /// # use bytes::Bytes;
373    /// # use arrow_array::{Int32Array, RecordBatch};
374    /// # use arrow_schema::{DataType, Field, Schema};
375    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
376    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
377    /// # use tempfile::tempfile;
378    /// # use futures::StreamExt;
379    /// # #[tokio::main(flavor="current_thread")]
380    /// # async fn main() {
381    /// #
382    /// # let mut file = tempfile().unwrap();
383    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
384    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
385    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
386    /// # writer.write(&batch).unwrap();
387    /// # writer.close().unwrap();
388    /// // open file with parquet data
389    /// let mut file = tokio::fs::File::from_std(file);
390    /// // load metadata once
391    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
392    /// // create two readers, a and b, from the same underlying file
393    /// // without reading the metadata again
394    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
395    ///     file.try_clone().await.unwrap(),
396    ///     meta.clone()
397    /// ).build().unwrap();
398    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
399    ///
400    /// // Can read batches from both readers in parallel
401    /// assert_eq!(
402    ///   a.next().await.unwrap().unwrap(),
403    ///   b.next().await.unwrap().unwrap(),
404    /// );
405    /// # }
406    /// ```
407    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
408        Self::new_builder(AsyncReader(input), metadata)
409    }
410
411    /// Read bloom filter for a column in a row group
412    ///
413    /// Returns `None` if the column does not have a bloom filter
414    ///
415    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
416    pub async fn get_row_group_column_bloom_filter(
417        &mut self,
418        row_group_idx: usize,
419        column_idx: usize,
420    ) -> Result<Option<Sbbf>> {
421        let metadata = self.metadata.row_group(row_group_idx);
422        let column_metadata = metadata.column(column_idx);
423
424        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
425            offset
426                .try_into()
427                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
428        } else {
429            return Ok(None);
430        };
431
432        let buffer = match column_metadata.bloom_filter_length() {
433            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
434            None => self
435                .input
436                .0
437                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
438        }
439        .await?;
440
441        let (header, bitset_offset) =
442            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
443
444        match header.algorithm {
445            BloomFilterAlgorithm::BLOCK => {
446                // this match exists to future proof the singleton algorithm enum
447            }
448        }
449        match header.compression {
450            BloomFilterCompression::UNCOMPRESSED => {
451                // this match exists to future proof the singleton compression enum
452            }
453        }
454        match header.hash {
455            BloomFilterHash::XXHASH => {
456                // this match exists to future proof the singleton hash enum
457            }
458        }
459
460        let bitset = match column_metadata.bloom_filter_length() {
461            Some(_) => buffer.slice(
462                (TryInto::<usize>::try_into(bitset_offset).unwrap()
463                    - TryInto::<usize>::try_into(offset).unwrap())..,
464            ),
465            None => {
466                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
467                    ParquetError::General("Bloom filter length is invalid".to_string())
468                })?;
469                self.input
470                    .0
471                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
472                    .await?
473            }
474        };
475        Ok(Some(Sbbf::new(&bitset)))
476    }
477
478    /// Build a new [`ParquetRecordBatchStream`]
479    ///
480    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
481    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
482        let Self {
483            input,
484            metadata,
485            schema,
486            fields,
487            batch_size,
488            row_groups,
489            projection,
490            filter,
491            selection,
492            row_selection_policy: selection_strategy,
493            limit,
494            offset,
495            metrics,
496            max_predicate_cache_size,
497        } = self;
498
499        // Ensure schema of ParquetRecordBatchStream respects projection, and does
500        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
501        let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
502        let projected_fields = schema
503            .fields
504            .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
505        let projected_schema = Arc::new(Schema::new(projected_fields));
506
507        let decoder = ParquetPushDecoderBuilder {
508            input: NoInput,
509            metadata,
510            schema,
511            fields,
512            projection,
513            filter,
514            selection,
515            row_selection_policy: selection_strategy,
516            batch_size,
517            row_groups,
518            limit,
519            offset,
520            metrics,
521            max_predicate_cache_size,
522        }
523        .build()?;
524
525        let request_state = RequestState::None { input: input.0 };
526
527        Ok(ParquetRecordBatchStream {
528            schema: projected_schema,
529            decoder,
530            request_state,
531        })
532    }
533}
534
535/// State machine that tracks outstanding requests to fetch data
536///
537/// The parameter `T` is the input, typically an `AsyncFileReader`
538enum RequestState<T> {
539    /// No outstanding requests
540    None {
541        input: T,
542    },
543    /// There is an outstanding request for data
544    Outstanding {
545        /// Ranges that have been requested
546        ranges: Vec<Range<u64>>,
547        /// Future that will resolve (input, requested_ranges)
548        ///
549        /// Note the future owns the reader while the request is outstanding
550        /// and returns it upon completion
551        future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
552    },
553    Done,
554}
555
556impl<T> RequestState<T>
557where
558    T: AsyncFileReader + Unpin + Send + 'static,
559{
560    /// Issue a request to fetch `ranges`, returning the Outstanding state
561    fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
562        let ranges_captured = ranges.clone();
563
564        // Note this must move the input *into* the future
565        // because the get_byte_ranges future has a lifetime
566        // (aka can have references internally) and thus must
567        // own the input while the request is outstanding.
568        let future = async move {
569            let data = input.get_byte_ranges(ranges_captured).await?;
570            Ok((input, data))
571        }
572        .boxed();
573        RequestState::Outstanding { ranges, future }
574    }
575}
576
577impl<T> std::fmt::Debug for RequestState<T> {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        match self {
580            RequestState::None { input: _ } => f
581                .debug_struct("RequestState::None")
582                .field("input", &"...")
583                .finish(),
584            RequestState::Outstanding { ranges, .. } => f
585                .debug_struct("RequestState::Outstanding")
586                .field("ranges", &ranges)
587                .finish(),
588            RequestState::Done => {
589                write!(f, "RequestState::Done")
590            }
591        }
592    }
593}
594
595/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
596///
597/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
598/// allowing users to decode record batches separately from I/O.
599///
600/// # I/O Buffering
601///
602/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
603/// (projection + filtering, etc) and decodes the rows from those buffered pages.
604///
605/// For example, if all rows and columns are selected, the entire row group is
606/// buffered in memory during decode. This minimizes the number of IO operations
607/// required, which is especially important for object stores, where IO operations
608/// have latencies in the hundreds of milliseconds
609///
610/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
611/// buffering.
612///
613/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
614pub struct ParquetRecordBatchStream<T> {
615    /// Output schema of the stream
616    schema: SchemaRef,
617    /// Input and Outstanding IO request, if any
618    request_state: RequestState<T>,
619    /// Decoding state machine (no IO)
620    decoder: ParquetPushDecoder,
621}
622
623impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
624    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
625        f.debug_struct("ParquetRecordBatchStream")
626            .field("request_state", &self.request_state)
627            .finish()
628    }
629}
630
631impl<T> ParquetRecordBatchStream<T> {
632    /// Returns the projected [`SchemaRef`] for reading the parquet file.
633    ///
634    /// Note that the schema metadata will be stripped here. See
635    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
636    pub fn schema(&self) -> &SchemaRef {
637        &self.schema
638    }
639}
640
641impl<T> ParquetRecordBatchStream<T>
642where
643    T: AsyncFileReader + Unpin + Send + 'static,
644{
645    /// Fetches the next row group from the stream.
646    ///
647    /// Users can continue to call this function to get row groups and decode them concurrently.
648    ///
649    /// ## Notes
650    ///
651    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
652    ///
653    /// ## Returns
654    ///
655    /// - `Ok(None)` if the stream has ended.
656    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
657    /// - `Ok(Some(reader))` which holds all the data for the row group.
658    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
659        loop {
660            // Take ownership of request state to process, leaving self in a
661            // valid state
662            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
663            match request_state {
664                // No outstanding requests, proceed to setup next row group
665                RequestState::None { input } => {
666                    match self.decoder.try_next_reader()? {
667                        DecodeResult::NeedsData(ranges) => {
668                            self.request_state = RequestState::begin_request(input, ranges);
669                            continue; // poll again (as the input might be ready immediately)
670                        }
671                        DecodeResult::Data(reader) => {
672                            self.request_state = RequestState::None { input };
673                            return Ok(Some(reader));
674                        }
675                        DecodeResult::Finished => return Ok(None),
676                    }
677                }
678                RequestState::Outstanding { ranges, future } => {
679                    let (input, data) = future.await?;
680                    // Push the requested data to the decoder and try again
681                    self.decoder.push_ranges(ranges, data)?;
682                    self.request_state = RequestState::None { input };
683                    continue; // try and decode on next iteration
684                }
685                RequestState::Done => {
686                    self.request_state = RequestState::Done;
687                    return Ok(None);
688                }
689            }
690        }
691    }
692}
693
694impl<T> Stream for ParquetRecordBatchStream<T>
695where
696    T: AsyncFileReader + Unpin + Send + 'static,
697{
698    type Item = Result<RecordBatch>;
699    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
700        match self.poll_next_inner(cx) {
701            Ok(res) => {
702                // Successfully decoded a batch, or reached end of stream.
703                // convert Option<RecordBatch> to Option<Result<RecordBatch>>
704                res.map(|res| Ok(res).transpose())
705            }
706            Err(e) => {
707                self.request_state = RequestState::Done;
708                Poll::Ready(Some(Err(e)))
709            }
710        }
711    }
712}
713
714impl<T> ParquetRecordBatchStream<T>
715where
716    T: AsyncFileReader + Unpin + Send + 'static,
717{
718    /// Inner state machine
719    ///
720    /// Note this is separate from poll_next so we can use ? operator to check for errors
721    /// as it returns `Result<Poll<Option<RecordBatch>>>`
722    fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
723        loop {
724            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
725            match request_state {
726                RequestState::None { input } => {
727                    // No outstanding requests, proceed to decode the next batch
728                    match self.decoder.try_decode()? {
729                        DecodeResult::NeedsData(ranges) => {
730                            self.request_state = RequestState::begin_request(input, ranges);
731                            continue; // poll again (as the input might be ready immediately)
732                        }
733                        DecodeResult::Data(batch) => {
734                            self.request_state = RequestState::None { input };
735                            return Ok(Poll::Ready(Some(batch)));
736                        }
737                        DecodeResult::Finished => {
738                            self.request_state = RequestState::Done;
739                            return Ok(Poll::Ready(None));
740                        }
741                    }
742                }
743                RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
744                    // Data was ready, push it to the decoder and continue
745                    Poll::Ready(result) => {
746                        let (input, data) = result?;
747                        // Push the requested data to the decoder
748                        self.decoder.push_ranges(ranges, data)?;
749                        self.request_state = RequestState::None { input };
750                        continue; // next iteration will try to decode the next batch
751                    }
752                    Poll::Pending => {
753                        self.request_state = RequestState::Outstanding { ranges, future };
754                        return Ok(Poll::Pending);
755                    }
756                },
757                RequestState::Done => {
758                    // Stream is done (error or end), return None
759                    self.request_state = RequestState::Done;
760                    return Ok(Poll::Ready(None));
761                }
762            }
763        }
764    }
765}
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770    use crate::arrow::arrow_reader::RowSelectionPolicy;
771    use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
772    use crate::arrow::arrow_reader::{
773        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
774    };
775    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
776    use crate::arrow::schema::virtual_type::RowNumber;
777    use crate::arrow::{ArrowWriter, ProjectionMask};
778    use crate::file::metadata::ParquetMetaDataReader;
779    use crate::file::properties::WriterProperties;
780    use arrow::compute::kernels::cmp::eq;
781    use arrow::compute::or;
782    use arrow::error::Result as ArrowResult;
783    use arrow_array::builder::{ListBuilder, StringBuilder};
784    use arrow_array::cast::AsArray;
785    use arrow_array::types::Int32Type;
786    use arrow_array::{
787        Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray,
788        StructArray, UInt64Array,
789    };
790    use arrow_schema::{DataType, Field, Schema};
791    use arrow_select::concat::concat_batches;
792    use futures::{StreamExt, TryStreamExt};
793    use rand::{Rng, rng};
794    use std::collections::HashMap;
795    use std::sync::{Arc, Mutex};
796    use tempfile::tempfile;
797
798    #[derive(Clone)]
799    struct TestReader {
800        data: Bytes,
801        metadata: Option<Arc<ParquetMetaData>>,
802        requests: Arc<Mutex<Vec<Range<usize>>>>,
803    }
804
805    impl TestReader {
806        fn new(data: Bytes) -> Self {
807            Self {
808                data,
809                metadata: Default::default(),
810                requests: Default::default(),
811            }
812        }
813    }
814
815    impl AsyncFileReader for TestReader {
816        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
817            let range = range.clone();
818            self.requests
819                .lock()
820                .unwrap()
821                .push(range.start as usize..range.end as usize);
822            futures::future::ready(Ok(self
823                .data
824                .slice(range.start as usize..range.end as usize)))
825            .boxed()
826        }
827
828        fn get_metadata<'a>(
829            &'a mut self,
830            options: Option<&'a ArrowReaderOptions>,
831        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
832            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
833                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
834            );
835            self.metadata = Some(Arc::new(
836                metadata_reader.parse_and_finish(&self.data).unwrap(),
837            ));
838            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
839        }
840    }
841
842    #[tokio::test]
843    async fn test_async_reader() {
844        let testdata = arrow::util::test_util::parquet_test_data();
845        let path = format!("{testdata}/alltypes_plain.parquet");
846        let data = Bytes::from(std::fs::read(path).unwrap());
847
848        let async_reader = TestReader::new(data.clone());
849
850        let requests = async_reader.requests.clone();
851        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
852            .await
853            .unwrap();
854
855        let metadata = builder.metadata().clone();
856        assert_eq!(metadata.num_row_groups(), 1);
857
858        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
859        let stream = builder
860            .with_projection(mask.clone())
861            .with_batch_size(1024)
862            .build()
863            .unwrap();
864
865        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
866
867        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
868            .unwrap()
869            .with_projection(mask)
870            .with_batch_size(104)
871            .build()
872            .unwrap()
873            .collect::<ArrowResult<Vec<_>>>()
874            .unwrap();
875
876        assert_eq!(async_batches, sync_batches);
877
878        let requests = requests.lock().unwrap();
879        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
880        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
881
882        assert_eq!(
883            &requests[..],
884            &[
885                offset_1 as usize..(offset_1 + length_1) as usize,
886                offset_2 as usize..(offset_2 + length_2) as usize
887            ]
888        );
889    }
890
891    #[tokio::test]
892    async fn test_async_reader_with_next_row_group() {
893        let testdata = arrow::util::test_util::parquet_test_data();
894        let path = format!("{testdata}/alltypes_plain.parquet");
895        let data = Bytes::from(std::fs::read(path).unwrap());
896
897        let async_reader = TestReader::new(data.clone());
898
899        let requests = async_reader.requests.clone();
900        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
901            .await
902            .unwrap();
903
904        let metadata = builder.metadata().clone();
905        assert_eq!(metadata.num_row_groups(), 1);
906
907        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
908        let mut stream = builder
909            .with_projection(mask.clone())
910            .with_batch_size(1024)
911            .build()
912            .unwrap();
913
914        let mut readers = vec![];
915        while let Some(reader) = stream.next_row_group().await.unwrap() {
916            readers.push(reader);
917        }
918
919        let async_batches: Vec<_> = readers
920            .into_iter()
921            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
922            .collect();
923
924        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
925            .unwrap()
926            .with_projection(mask)
927            .with_batch_size(104)
928            .build()
929            .unwrap()
930            .collect::<ArrowResult<Vec<_>>>()
931            .unwrap();
932
933        assert_eq!(async_batches, sync_batches);
934
935        let requests = requests.lock().unwrap();
936        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
937        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
938
939        assert_eq!(
940            &requests[..],
941            &[
942                offset_1 as usize..(offset_1 + length_1) as usize,
943                offset_2 as usize..(offset_2 + length_2) as usize
944            ]
945        );
946    }
947
948    #[tokio::test]
949    async fn test_async_reader_with_index() {
950        let testdata = arrow::util::test_util::parquet_test_data();
951        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
952        let data = Bytes::from(std::fs::read(path).unwrap());
953
954        let async_reader = TestReader::new(data.clone());
955
956        let options = ArrowReaderOptions::new().with_page_index(true);
957        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
958            .await
959            .unwrap();
960
961        // The builder should have page and offset indexes loaded now
962        let metadata_with_index = builder.metadata();
963        assert_eq!(metadata_with_index.num_row_groups(), 1);
964
965        // Check offset indexes are present for all columns
966        let offset_index = metadata_with_index.offset_index().unwrap();
967        let column_index = metadata_with_index.column_index().unwrap();
968
969        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
970        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
971
972        let num_columns = metadata_with_index
973            .file_metadata()
974            .schema_descr()
975            .num_columns();
976
977        // Check page indexes are present for all columns
978        offset_index
979            .iter()
980            .for_each(|x| assert_eq!(x.len(), num_columns));
981        column_index
982            .iter()
983            .for_each(|x| assert_eq!(x.len(), num_columns));
984
985        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
986        let stream = builder
987            .with_projection(mask.clone())
988            .with_batch_size(1024)
989            .build()
990            .unwrap();
991
992        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
993
994        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
995            .unwrap()
996            .with_projection(mask)
997            .with_batch_size(1024)
998            .build()
999            .unwrap()
1000            .collect::<ArrowResult<Vec<_>>>()
1001            .unwrap();
1002
1003        assert_eq!(async_batches, sync_batches);
1004    }
1005
1006    #[tokio::test]
1007    async fn test_async_reader_with_limit() {
1008        let testdata = arrow::util::test_util::parquet_test_data();
1009        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1010        let data = Bytes::from(std::fs::read(path).unwrap());
1011
1012        let metadata = ParquetMetaDataReader::new()
1013            .parse_and_finish(&data)
1014            .unwrap();
1015        let metadata = Arc::new(metadata);
1016
1017        assert_eq!(metadata.num_row_groups(), 1);
1018
1019        let async_reader = TestReader::new(data.clone());
1020
1021        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1022            .await
1023            .unwrap();
1024
1025        assert_eq!(builder.metadata().num_row_groups(), 1);
1026
1027        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1028        let stream = builder
1029            .with_projection(mask.clone())
1030            .with_batch_size(1024)
1031            .with_limit(1)
1032            .build()
1033            .unwrap();
1034
1035        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1036
1037        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1038            .unwrap()
1039            .with_projection(mask)
1040            .with_batch_size(1024)
1041            .with_limit(1)
1042            .build()
1043            .unwrap()
1044            .collect::<ArrowResult<Vec<_>>>()
1045            .unwrap();
1046
1047        assert_eq!(async_batches, sync_batches);
1048    }
1049
1050    #[tokio::test]
1051    async fn test_async_reader_skip_pages() {
1052        let testdata = arrow::util::test_util::parquet_test_data();
1053        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1054        let data = Bytes::from(std::fs::read(path).unwrap());
1055
1056        let async_reader = TestReader::new(data.clone());
1057
1058        let options = ArrowReaderOptions::new().with_page_index(true);
1059        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1060            .await
1061            .unwrap();
1062
1063        assert_eq!(builder.metadata().num_row_groups(), 1);
1064
1065        let selection = RowSelection::from(vec![
1066            RowSelector::skip(21),   // Skip first page
1067            RowSelector::select(21), // Select page to boundary
1068            RowSelector::skip(41),   // Skip multiple pages
1069            RowSelector::select(41), // Select multiple pages
1070            RowSelector::skip(25),   // Skip page across boundary
1071            RowSelector::select(25), // Select across page boundary
1072            RowSelector::skip(7116), // Skip to final page boundary
1073            RowSelector::select(10), // Select final page
1074        ]);
1075
1076        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1077
1078        let stream = builder
1079            .with_projection(mask.clone())
1080            .with_row_selection(selection.clone())
1081            .build()
1082            .expect("building stream");
1083
1084        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1085
1086        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1087            .unwrap()
1088            .with_projection(mask)
1089            .with_batch_size(1024)
1090            .with_row_selection(selection)
1091            .build()
1092            .unwrap()
1093            .collect::<ArrowResult<Vec<_>>>()
1094            .unwrap();
1095
1096        assert_eq!(async_batches, sync_batches);
1097    }
1098
1099    #[tokio::test]
1100    async fn test_fuzz_async_reader_selection() {
1101        let testdata = arrow::util::test_util::parquet_test_data();
1102        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1103        let data = Bytes::from(std::fs::read(path).unwrap());
1104
1105        let mut rand = rng();
1106
1107        for _ in 0..100 {
1108            let mut expected_rows = 0;
1109            let mut total_rows = 0;
1110            let mut skip = false;
1111            let mut selectors = vec![];
1112
1113            while total_rows < 7300 {
1114                let row_count: usize = rand.random_range(1..100);
1115
1116                let row_count = row_count.min(7300 - total_rows);
1117
1118                selectors.push(RowSelector { row_count, skip });
1119
1120                total_rows += row_count;
1121                if !skip {
1122                    expected_rows += row_count;
1123                }
1124
1125                skip = !skip;
1126            }
1127
1128            let selection = RowSelection::from(selectors);
1129
1130            let async_reader = TestReader::new(data.clone());
1131
1132            let options = ArrowReaderOptions::new().with_page_index(true);
1133            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1134                .await
1135                .unwrap();
1136
1137            assert_eq!(builder.metadata().num_row_groups(), 1);
1138
1139            let col_idx: usize = rand.random_range(0..13);
1140            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1141
1142            let stream = builder
1143                .with_projection(mask.clone())
1144                .with_row_selection(selection.clone())
1145                .build()
1146                .expect("building stream");
1147
1148            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1149
1150            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1151
1152            assert_eq!(actual_rows, expected_rows);
1153        }
1154    }
1155
1156    #[tokio::test]
1157    async fn test_async_reader_zero_row_selector() {
1158        //See https://github.com/apache/arrow-rs/issues/2669
1159        let testdata = arrow::util::test_util::parquet_test_data();
1160        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1161        let data = Bytes::from(std::fs::read(path).unwrap());
1162
1163        let mut rand = rng();
1164
1165        let mut expected_rows = 0;
1166        let mut total_rows = 0;
1167        let mut skip = false;
1168        let mut selectors = vec![];
1169
1170        selectors.push(RowSelector {
1171            row_count: 0,
1172            skip: false,
1173        });
1174
1175        while total_rows < 7300 {
1176            let row_count: usize = rand.random_range(1..100);
1177
1178            let row_count = row_count.min(7300 - total_rows);
1179
1180            selectors.push(RowSelector { row_count, skip });
1181
1182            total_rows += row_count;
1183            if !skip {
1184                expected_rows += row_count;
1185            }
1186
1187            skip = !skip;
1188        }
1189
1190        let selection = RowSelection::from(selectors);
1191
1192        let async_reader = TestReader::new(data.clone());
1193
1194        let options = ArrowReaderOptions::new().with_page_index(true);
1195        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1196            .await
1197            .unwrap();
1198
1199        assert_eq!(builder.metadata().num_row_groups(), 1);
1200
1201        let col_idx: usize = rand.random_range(0..13);
1202        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1203
1204        let stream = builder
1205            .with_projection(mask.clone())
1206            .with_row_selection(selection.clone())
1207            .build()
1208            .expect("building stream");
1209
1210        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1211
1212        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1213
1214        assert_eq!(actual_rows, expected_rows);
1215    }
1216
1217    #[tokio::test]
1218    async fn test_row_filter_full_page_skip_is_handled_async() {
1219        let first_value: i64 = 1111;
1220        let last_value: i64 = 9999;
1221        let num_rows: usize = 12;
1222
1223        // build data with row selection average length 4
1224        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
1225        // The Row Selection would be [1111, (skip 10), 9999]
1226        let schema = Arc::new(Schema::new(vec![
1227            Field::new("key", DataType::Int64, false),
1228            Field::new("value", DataType::Int64, false),
1229        ]));
1230
1231        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
1232        int_values[0] = first_value;
1233        int_values[num_rows - 1] = last_value;
1234        let keys = Int64Array::from(int_values.clone());
1235        let values = Int64Array::from(int_values.clone());
1236        let batch = RecordBatch::try_new(
1237            Arc::clone(&schema),
1238            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1239        )
1240        .unwrap();
1241
1242        let props = WriterProperties::builder()
1243            .set_write_batch_size(2)
1244            .set_data_page_row_count_limit(2)
1245            .build();
1246
1247        let mut buffer = Vec::new();
1248        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
1249        writer.write(&batch).unwrap();
1250        writer.close().unwrap();
1251        let data = Bytes::from(buffer);
1252
1253        let builder = ParquetRecordBatchStreamBuilder::new_with_options(
1254            TestReader::new(data.clone()),
1255            ArrowReaderOptions::new().with_page_index(true),
1256        )
1257        .await
1258        .unwrap();
1259        let schema = builder.parquet_schema().clone();
1260        let filter_mask = ProjectionMask::leaves(&schema, [0]);
1261
1262        let make_predicate = |mask: ProjectionMask| {
1263            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
1264                let column = batch.column(0);
1265                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
1266                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
1267                or(&match_first, &match_second)
1268            })
1269        };
1270
1271        let predicate = make_predicate(filter_mask.clone());
1272
1273        // The batch size is set to 12 to read all rows in one go after filtering
1274        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
1275        let stream = ParquetRecordBatchStreamBuilder::new_with_options(
1276            TestReader::new(data.clone()),
1277            ArrowReaderOptions::new().with_page_index(true),
1278        )
1279        .await
1280        .unwrap()
1281        .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1282        .with_batch_size(12)
1283        .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
1284        .build()
1285        .unwrap();
1286
1287        let schema = stream.schema().clone();
1288        let batches: Vec<_> = stream.try_collect().await.unwrap();
1289        let result = concat_batches(&schema, &batches).unwrap();
1290        assert_eq!(result.num_rows(), 2);
1291    }
1292
1293    #[tokio::test]
1294    async fn test_row_filter() {
1295        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1296        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1297        let data = RecordBatch::try_from_iter([
1298            ("a", Arc::new(a) as ArrayRef),
1299            ("b", Arc::new(b) as ArrayRef),
1300        ])
1301        .unwrap();
1302
1303        let mut buf = Vec::with_capacity(1024);
1304        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1305        writer.write(&data).unwrap();
1306        writer.close().unwrap();
1307
1308        let data: Bytes = buf.into();
1309        let metadata = ParquetMetaDataReader::new()
1310            .parse_and_finish(&data)
1311            .unwrap();
1312        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1313
1314        let test = TestReader::new(data);
1315        let requests = test.requests.clone();
1316
1317        let a_scalar = StringArray::from_iter_values(["b"]);
1318        let a_filter = ArrowPredicateFn::new(
1319            ProjectionMask::leaves(&parquet_schema, vec![0]),
1320            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1321        );
1322
1323        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1324
1325        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1326        let stream = ParquetRecordBatchStreamBuilder::new(test)
1327            .await
1328            .unwrap()
1329            .with_projection(mask.clone())
1330            .with_batch_size(1024)
1331            .with_row_filter(filter)
1332            .build()
1333            .unwrap();
1334
1335        let batches: Vec<_> = stream.try_collect().await.unwrap();
1336        assert_eq!(batches.len(), 1);
1337
1338        let batch = &batches[0];
1339        assert_eq!(batch.num_columns(), 2);
1340
1341        // Filter should have kept only rows with "b" in column 0
1342        assert_eq!(
1343            batch.column(0).as_ref(),
1344            &StringArray::from_iter_values(["b", "b", "b"])
1345        );
1346        assert_eq!(
1347            batch.column(1).as_ref(),
1348            &StringArray::from_iter_values(["2", "3", "4"])
1349        );
1350
1351        // Should only have made 2 requests:
1352        // * First request fetches data for evaluating the predicate
1353        // * Second request fetches data for evaluating the projection
1354        assert_eq!(requests.lock().unwrap().len(), 2);
1355    }
1356
1357    #[tokio::test]
1358    async fn test_two_row_filters() {
1359        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1360        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1361        let c = Int32Array::from_iter(0..6);
1362        let data = RecordBatch::try_from_iter([
1363            ("a", Arc::new(a) as ArrayRef),
1364            ("b", Arc::new(b) as ArrayRef),
1365            ("c", Arc::new(c) as ArrayRef),
1366        ])
1367        .unwrap();
1368
1369        let mut buf = Vec::with_capacity(1024);
1370        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1371        writer.write(&data).unwrap();
1372        writer.close().unwrap();
1373
1374        let data: Bytes = buf.into();
1375        let metadata = ParquetMetaDataReader::new()
1376            .parse_and_finish(&data)
1377            .unwrap();
1378        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1379
1380        let test = TestReader::new(data);
1381        let requests = test.requests.clone();
1382
1383        let a_scalar = StringArray::from_iter_values(["b"]);
1384        let a_filter = ArrowPredicateFn::new(
1385            ProjectionMask::leaves(&parquet_schema, vec![0]),
1386            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1387        );
1388
1389        let b_scalar = StringArray::from_iter_values(["4"]);
1390        let b_filter = ArrowPredicateFn::new(
1391            ProjectionMask::leaves(&parquet_schema, vec![1]),
1392            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1393        );
1394
1395        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1396
1397        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1398        let stream = ParquetRecordBatchStreamBuilder::new(test)
1399            .await
1400            .unwrap()
1401            .with_projection(mask.clone())
1402            .with_batch_size(1024)
1403            .with_row_filter(filter)
1404            .build()
1405            .unwrap();
1406
1407        let batches: Vec<_> = stream.try_collect().await.unwrap();
1408        assert_eq!(batches.len(), 1);
1409
1410        let batch = &batches[0];
1411        assert_eq!(batch.num_rows(), 1);
1412        assert_eq!(batch.num_columns(), 2);
1413
1414        let col = batch.column(0);
1415        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1416        assert_eq!(val, "b");
1417
1418        let col = batch.column(1);
1419        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1420        assert_eq!(val, 3);
1421
1422        // Should only have made 3 requests
1423        // * First request fetches data for evaluating the first predicate
1424        // * Second request fetches data for evaluating the second predicate
1425        // * Third request fetches data for evaluating the projection
1426        assert_eq!(requests.lock().unwrap().len(), 3);
1427    }
1428
1429    #[tokio::test]
1430    async fn test_limit_multiple_row_groups() {
1431        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1432        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1433        let c = Int32Array::from_iter(0..6);
1434        let data = RecordBatch::try_from_iter([
1435            ("a", Arc::new(a) as ArrayRef),
1436            ("b", Arc::new(b) as ArrayRef),
1437            ("c", Arc::new(c) as ArrayRef),
1438        ])
1439        .unwrap();
1440
1441        let mut buf = Vec::with_capacity(1024);
1442        let props = WriterProperties::builder()
1443            .set_max_row_group_size(3)
1444            .build();
1445        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1446        writer.write(&data).unwrap();
1447        writer.close().unwrap();
1448
1449        let data: Bytes = buf.into();
1450        let metadata = ParquetMetaDataReader::new()
1451            .parse_and_finish(&data)
1452            .unwrap();
1453
1454        assert_eq!(metadata.num_row_groups(), 2);
1455
1456        let test = TestReader::new(data);
1457
1458        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1459            .await
1460            .unwrap()
1461            .with_batch_size(1024)
1462            .with_limit(4)
1463            .build()
1464            .unwrap();
1465
1466        let batches: Vec<_> = stream.try_collect().await.unwrap();
1467        // Expect one batch for each row group
1468        assert_eq!(batches.len(), 2);
1469
1470        let batch = &batches[0];
1471        // First batch should contain all rows
1472        assert_eq!(batch.num_rows(), 3);
1473        assert_eq!(batch.num_columns(), 3);
1474        let col2 = batch.column(2).as_primitive::<Int32Type>();
1475        assert_eq!(col2.values(), &[0, 1, 2]);
1476
1477        let batch = &batches[1];
1478        // Second batch should trigger the limit and only have one row
1479        assert_eq!(batch.num_rows(), 1);
1480        assert_eq!(batch.num_columns(), 3);
1481        let col2 = batch.column(2).as_primitive::<Int32Type>();
1482        assert_eq!(col2.values(), &[3]);
1483
1484        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1485            .await
1486            .unwrap()
1487            .with_offset(2)
1488            .with_limit(3)
1489            .build()
1490            .unwrap();
1491
1492        let batches: Vec<_> = stream.try_collect().await.unwrap();
1493        // Expect one batch for each row group
1494        assert_eq!(batches.len(), 2);
1495
1496        let batch = &batches[0];
1497        // First batch should contain one row
1498        assert_eq!(batch.num_rows(), 1);
1499        assert_eq!(batch.num_columns(), 3);
1500        let col2 = batch.column(2).as_primitive::<Int32Type>();
1501        assert_eq!(col2.values(), &[2]);
1502
1503        let batch = &batches[1];
1504        // Second batch should contain two rows
1505        assert_eq!(batch.num_rows(), 2);
1506        assert_eq!(batch.num_columns(), 3);
1507        let col2 = batch.column(2).as_primitive::<Int32Type>();
1508        assert_eq!(col2.values(), &[3, 4]);
1509
1510        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1511            .await
1512            .unwrap()
1513            .with_offset(4)
1514            .with_limit(20)
1515            .build()
1516            .unwrap();
1517
1518        let batches: Vec<_> = stream.try_collect().await.unwrap();
1519        // Should skip first row group
1520        assert_eq!(batches.len(), 1);
1521
1522        let batch = &batches[0];
1523        // First batch should contain two rows
1524        assert_eq!(batch.num_rows(), 2);
1525        assert_eq!(batch.num_columns(), 3);
1526        let col2 = batch.column(2).as_primitive::<Int32Type>();
1527        assert_eq!(col2.values(), &[4, 5]);
1528    }
1529
1530    #[tokio::test]
1531    async fn test_row_filter_with_index() {
1532        let testdata = arrow::util::test_util::parquet_test_data();
1533        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1534        let data = Bytes::from(std::fs::read(path).unwrap());
1535
1536        let metadata = ParquetMetaDataReader::new()
1537            .parse_and_finish(&data)
1538            .unwrap();
1539        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1540
1541        assert_eq!(metadata.num_row_groups(), 1);
1542
1543        let async_reader = TestReader::new(data.clone());
1544
1545        let a_filter =
1546            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1547                Ok(batch.column(0).as_boolean().clone())
1548            });
1549
1550        let b_scalar = Int8Array::from(vec![2]);
1551        let b_filter = ArrowPredicateFn::new(
1552            ProjectionMask::leaves(&parquet_schema, vec![2]),
1553            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1554        );
1555
1556        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1557
1558        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1559
1560        let options = ArrowReaderOptions::new().with_page_index(true);
1561        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1562            .await
1563            .unwrap()
1564            .with_projection(mask.clone())
1565            .with_batch_size(1024)
1566            .with_row_filter(filter)
1567            .build()
1568            .unwrap();
1569
1570        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1571
1572        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1573
1574        assert_eq!(total_rows, 730);
1575    }
1576
1577    #[tokio::test]
1578    async fn test_batch_size_overallocate() {
1579        let testdata = arrow::util::test_util::parquet_test_data();
1580        // `alltypes_plain.parquet` only have 8 rows
1581        let path = format!("{testdata}/alltypes_plain.parquet");
1582        let data = Bytes::from(std::fs::read(path).unwrap());
1583
1584        let async_reader = TestReader::new(data.clone());
1585
1586        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1587            .await
1588            .unwrap();
1589
1590        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1591
1592        let builder = builder
1593            .with_projection(ProjectionMask::all())
1594            .with_batch_size(1024);
1595
1596        // even though the batch size is set to 1024, it should adjust to the max
1597        // number of rows in the file (8)
1598        assert_ne!(1024, file_rows);
1599        assert_eq!(builder.batch_size, file_rows);
1600
1601        let _stream = builder.build().unwrap();
1602    }
1603
1604    #[tokio::test]
1605    async fn test_get_row_group_column_bloom_filter_without_length() {
1606        let testdata = arrow::util::test_util::parquet_test_data();
1607        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1608        let data = Bytes::from(std::fs::read(path).unwrap());
1609        test_get_row_group_column_bloom_filter(data, false).await;
1610    }
1611
1612    #[tokio::test]
1613    async fn test_parquet_record_batch_stream_schema() {
1614        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1615            schema.flattened_fields().iter().map(|f| f.name()).collect()
1616        }
1617
1618        // ParquetRecordBatchReaderBuilder::schema differs from
1619        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1620        // schema contents (in terms of custom metadata attached to schema, and fields
1621        // returned). Test to ensure this remains consistent behaviour.
1622        //
1623        // Ensure same for asynchronous versions of the above.
1624
1625        // Prep data, for a schema with nested fields, with custom metadata
1626        let mut metadata = HashMap::with_capacity(1);
1627        metadata.insert("key".to_string(), "value".to_string());
1628
1629        let nested_struct_array = StructArray::from(vec![
1630            (
1631                Arc::new(Field::new("d", DataType::Utf8, true)),
1632                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1633            ),
1634            (
1635                Arc::new(Field::new("e", DataType::Utf8, true)),
1636                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1637            ),
1638        ]);
1639        let struct_array = StructArray::from(vec![
1640            (
1641                Arc::new(Field::new("a", DataType::Int32, true)),
1642                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1643            ),
1644            (
1645                Arc::new(Field::new("b", DataType::UInt64, true)),
1646                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1647            ),
1648            (
1649                Arc::new(Field::new(
1650                    "c",
1651                    nested_struct_array.data_type().clone(),
1652                    true,
1653                )),
1654                Arc::new(nested_struct_array) as ArrayRef,
1655            ),
1656        ]);
1657
1658        let schema =
1659            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1660        let record_batch = RecordBatch::from(struct_array)
1661            .with_schema(schema.clone())
1662            .unwrap();
1663
1664        // Write parquet with custom metadata in schema
1665        let mut file = tempfile().unwrap();
1666        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1667        writer.write(&record_batch).unwrap();
1668        writer.close().unwrap();
1669
1670        let all_fields = ["a", "b", "c", "d", "e"];
1671        // (leaf indices in mask, expected names in output schema all fields)
1672        let projections = [
1673            (vec![], vec![]),
1674            (vec![0], vec!["a"]),
1675            (vec![0, 1], vec!["a", "b"]),
1676            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1677            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1678        ];
1679
1680        // Ensure we're consistent for each of these projections
1681        for (indices, expected_projected_names) in projections {
1682            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1683                // Builder schema should preserve all fields and metadata
1684                assert_eq!(get_all_field_names(&builder), all_fields);
1685                assert_eq!(builder.metadata, metadata);
1686                // Reader & batch schema should show only projected fields, and no metadata
1687                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1688                assert_eq!(reader.metadata, HashMap::default());
1689                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1690                assert_eq!(batch.metadata, HashMap::default());
1691            };
1692
1693            let builder =
1694                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1695            let sync_builder_schema = builder.schema().clone();
1696            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1697            let mut reader = builder.with_projection(mask).build().unwrap();
1698            let sync_reader_schema = reader.schema();
1699            let batch = reader.next().unwrap().unwrap();
1700            let sync_batch_schema = batch.schema();
1701            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1702
1703            // asynchronous should be same
1704            let file = tokio::fs::File::from(file.try_clone().unwrap());
1705            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1706            let async_builder_schema = builder.schema().clone();
1707            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1708            let mut reader = builder.with_projection(mask).build().unwrap();
1709            let async_reader_schema = reader.schema().clone();
1710            let batch = reader.next().await.unwrap().unwrap();
1711            let async_batch_schema = batch.schema();
1712            assert_schemas(
1713                async_builder_schema,
1714                async_reader_schema,
1715                async_batch_schema,
1716            );
1717        }
1718    }
1719
1720    #[tokio::test]
1721    async fn test_get_row_group_column_bloom_filter_with_length() {
1722        // convert to new parquet file with bloom_filter_length
1723        let testdata = arrow::util::test_util::parquet_test_data();
1724        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1725        let data = Bytes::from(std::fs::read(path).unwrap());
1726        let async_reader = TestReader::new(data.clone());
1727        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1728            .await
1729            .unwrap();
1730        let schema = builder.schema().clone();
1731        let stream = builder.build().unwrap();
1732        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1733
1734        let mut parquet_data = Vec::new();
1735        let props = WriterProperties::builder()
1736            .set_bloom_filter_enabled(true)
1737            .build();
1738        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1739        for batch in batches {
1740            writer.write(&batch).unwrap();
1741        }
1742        writer.close().unwrap();
1743
1744        // test the new parquet file
1745        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1746    }
1747
1748    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1749        let async_reader = TestReader::new(data.clone());
1750
1751        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1752            .await
1753            .unwrap();
1754
1755        let metadata = builder.metadata();
1756        assert_eq!(metadata.num_row_groups(), 1);
1757        let row_group = metadata.row_group(0);
1758        let column = row_group.column(0);
1759        assert_eq!(column.bloom_filter_length().is_some(), with_length);
1760
1761        let sbbf = builder
1762            .get_row_group_column_bloom_filter(0, 0)
1763            .await
1764            .unwrap()
1765            .unwrap();
1766        assert!(sbbf.check(&"Hello"));
1767        assert!(!sbbf.check(&"Hello_Not_Exists"));
1768    }
1769
1770    #[tokio::test]
1771    async fn test_nested_skip() {
1772        let schema = Arc::new(Schema::new(vec![
1773            Field::new("col_1", DataType::UInt64, false),
1774            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1775        ]));
1776
1777        // Default writer properties
1778        let props = WriterProperties::builder()
1779            .set_data_page_row_count_limit(256)
1780            .set_write_batch_size(256)
1781            .set_max_row_group_size(1024);
1782
1783        // Write data
1784        let mut file = tempfile().unwrap();
1785        let mut writer =
1786            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1787
1788        let mut builder = ListBuilder::new(StringBuilder::new());
1789        for id in 0..1024 {
1790            match id % 3 {
1791                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1792                1 => builder.append_value([Some(format!("id_{id}"))]),
1793                _ => builder.append_null(),
1794            }
1795        }
1796        let refs = vec![
1797            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1798            Arc::new(builder.finish()) as ArrayRef,
1799        ];
1800
1801        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1802        writer.write(&batch).unwrap();
1803        writer.close().unwrap();
1804
1805        let selections = [
1806            RowSelection::from(vec![
1807                RowSelector::skip(313),
1808                RowSelector::select(1),
1809                RowSelector::skip(709),
1810                RowSelector::select(1),
1811            ]),
1812            RowSelection::from(vec![
1813                RowSelector::skip(255),
1814                RowSelector::select(1),
1815                RowSelector::skip(767),
1816                RowSelector::select(1),
1817            ]),
1818            RowSelection::from(vec![
1819                RowSelector::select(255),
1820                RowSelector::skip(1),
1821                RowSelector::select(767),
1822                RowSelector::skip(1),
1823            ]),
1824            RowSelection::from(vec![
1825                RowSelector::skip(254),
1826                RowSelector::select(1),
1827                RowSelector::select(1),
1828                RowSelector::skip(767),
1829                RowSelector::select(1),
1830            ]),
1831        ];
1832
1833        for selection in selections {
1834            let expected = selection.row_count();
1835            // Read data
1836            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1837                tokio::fs::File::from_std(file.try_clone().unwrap()),
1838                ArrowReaderOptions::new().with_page_index(true),
1839            )
1840            .await
1841            .unwrap();
1842
1843            reader = reader.with_row_selection(selection);
1844
1845            let mut stream = reader.build().unwrap();
1846
1847            let mut total_rows = 0;
1848            while let Some(rb) = stream.next().await {
1849                let rb = rb.unwrap();
1850                total_rows += rb.num_rows();
1851            }
1852            assert_eq!(total_rows, expected);
1853        }
1854    }
1855
1856    #[tokio::test]
1857    async fn test_row_filter_nested() {
1858        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1859        let b = StructArray::from(vec![
1860            (
1861                Arc::new(Field::new("aa", DataType::Utf8, true)),
1862                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1863            ),
1864            (
1865                Arc::new(Field::new("bb", DataType::Utf8, true)),
1866                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1867            ),
1868        ]);
1869        let c = Int32Array::from_iter(0..6);
1870        let data = RecordBatch::try_from_iter([
1871            ("a", Arc::new(a) as ArrayRef),
1872            ("b", Arc::new(b) as ArrayRef),
1873            ("c", Arc::new(c) as ArrayRef),
1874        ])
1875        .unwrap();
1876
1877        let mut buf = Vec::with_capacity(1024);
1878        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1879        writer.write(&data).unwrap();
1880        writer.close().unwrap();
1881
1882        let data: Bytes = buf.into();
1883        let metadata = ParquetMetaDataReader::new()
1884            .parse_and_finish(&data)
1885            .unwrap();
1886        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1887
1888        let test = TestReader::new(data);
1889        let requests = test.requests.clone();
1890
1891        let a_scalar = StringArray::from_iter_values(["b"]);
1892        let a_filter = ArrowPredicateFn::new(
1893            ProjectionMask::leaves(&parquet_schema, vec![0]),
1894            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1895        );
1896
1897        let b_scalar = StringArray::from_iter_values(["4"]);
1898        let b_filter = ArrowPredicateFn::new(
1899            ProjectionMask::leaves(&parquet_schema, vec![2]),
1900            move |batch| {
1901                // Filter on the second element of the struct.
1902                let struct_array = batch
1903                    .column(0)
1904                    .as_any()
1905                    .downcast_ref::<StructArray>()
1906                    .unwrap();
1907                eq(struct_array.column(0), &Scalar::new(&b_scalar))
1908            },
1909        );
1910
1911        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1912
1913        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
1914        let stream = ParquetRecordBatchStreamBuilder::new(test)
1915            .await
1916            .unwrap()
1917            .with_projection(mask.clone())
1918            .with_batch_size(1024)
1919            .with_row_filter(filter)
1920            .build()
1921            .unwrap();
1922
1923        let batches: Vec<_> = stream.try_collect().await.unwrap();
1924        assert_eq!(batches.len(), 1);
1925
1926        let batch = &batches[0];
1927        assert_eq!(batch.num_rows(), 1);
1928        assert_eq!(batch.num_columns(), 2);
1929
1930        let col = batch.column(0);
1931        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1932        assert_eq!(val, "b");
1933
1934        let col = batch.column(1);
1935        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1936        assert_eq!(val, 3);
1937
1938        // Should only have made 3 requests
1939        // * First request fetches data for evaluating the first predicate
1940        // * Second request fetches data for evaluating the second predicate
1941        // * Third request fetches data for evaluating the projection
1942        assert_eq!(requests.lock().unwrap().len(), 3);
1943    }
1944
1945    #[tokio::test]
1946    #[allow(deprecated)]
1947    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1948        use tokio::fs::File;
1949        let testdata = arrow::util::test_util::parquet_test_data();
1950        let path = format!("{testdata}/alltypes_plain.parquet");
1951        let mut file = File::open(&path).await.unwrap();
1952        let file_size = file.metadata().await.unwrap().len();
1953        let mut metadata = ParquetMetaDataReader::new()
1954            .with_page_indexes(true)
1955            .load_and_finish(&mut file, file_size)
1956            .await
1957            .unwrap();
1958
1959        metadata.set_offset_index(Some(vec![]));
1960        let options = ArrowReaderOptions::new().with_page_index(true);
1961        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1962        let reader =
1963            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1964                .build()
1965                .unwrap();
1966
1967        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1968        assert_eq!(result.len(), 1);
1969    }
1970
1971    #[tokio::test]
1972    #[allow(deprecated)]
1973    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1974        use tokio::fs::File;
1975        let testdata = arrow::util::test_util::parquet_test_data();
1976        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1977        let mut file = File::open(&path).await.unwrap();
1978        let file_size = file.metadata().await.unwrap().len();
1979        let metadata = ParquetMetaDataReader::new()
1980            .with_page_indexes(true)
1981            .load_and_finish(&mut file, file_size)
1982            .await
1983            .unwrap();
1984
1985        let options = ArrowReaderOptions::new().with_page_index(true);
1986        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1987        let reader =
1988            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1989                .build()
1990                .unwrap();
1991
1992        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1993        assert_eq!(result.len(), 8);
1994    }
1995
1996    #[tokio::test]
1997    #[allow(deprecated)]
1998    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1999        use tempfile::TempDir;
2000        use tokio::fs::File;
2001        fn write_metadata_to_local_file(
2002            metadata: ParquetMetaData,
2003            file: impl AsRef<std::path::Path>,
2004        ) {
2005            use crate::file::metadata::ParquetMetaDataWriter;
2006            use std::fs::File;
2007            let file = File::create(file).unwrap();
2008            ParquetMetaDataWriter::new(file, &metadata)
2009                .finish()
2010                .unwrap()
2011        }
2012
2013        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2014            use std::fs::File;
2015            let file = File::open(file).unwrap();
2016            ParquetMetaDataReader::new()
2017                .with_page_indexes(true)
2018                .parse_and_finish(&file)
2019                .unwrap()
2020        }
2021
2022        let testdata = arrow::util::test_util::parquet_test_data();
2023        let path = format!("{testdata}/alltypes_plain.parquet");
2024        let mut file = File::open(&path).await.unwrap();
2025        let file_size = file.metadata().await.unwrap().len();
2026        let metadata = ParquetMetaDataReader::new()
2027            .with_page_indexes(true)
2028            .load_and_finish(&mut file, file_size)
2029            .await
2030            .unwrap();
2031
2032        let tempdir = TempDir::new().unwrap();
2033        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2034        write_metadata_to_local_file(metadata, &metadata_path);
2035        let metadata = read_metadata_from_local_file(&metadata_path);
2036
2037        let options = ArrowReaderOptions::new().with_page_index(true);
2038        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2039        let reader =
2040            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2041                .build()
2042                .unwrap();
2043
2044        // Panics here
2045        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2046        assert_eq!(result.len(), 1);
2047    }
2048
2049    #[tokio::test]
2050    async fn test_cached_array_reader_sparse_offset_error() {
2051        use futures::TryStreamExt;
2052
2053        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2054        use arrow_array::{BooleanArray, RecordBatch};
2055
2056        let testdata = arrow::util::test_util::parquet_test_data();
2057        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2058        let data = Bytes::from(std::fs::read(path).unwrap());
2059
2060        let async_reader = TestReader::new(data);
2061
2062        // Enable page index so the fetch logic loads only required pages
2063        let options = ArrowReaderOptions::new().with_page_index(true);
2064        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2065            .await
2066            .unwrap();
2067
2068        // Skip the first 22 rows (entire first Parquet page) and then select the
2069        // next 3 rows (22, 23, 24). This means the fetch step will not include
2070        // the first page starting at file offset 0.
2071        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2072
2073        // Trivial predicate on column 0 that always returns `true`. Using the
2074        // same column in both predicate and projection activates the caching
2075        // layer (Producer/Consumer pattern).
2076        let parquet_schema = builder.parquet_schema();
2077        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2078        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2079            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2080        });
2081        let filter = RowFilter::new(vec![Box::new(always_true)]);
2082
2083        // Build the stream with batch size 8 so the cache reads whole batches
2084        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
2085        let stream = builder
2086            .with_batch_size(8)
2087            .with_projection(proj)
2088            .with_row_selection(selection)
2089            .with_row_filter(filter)
2090            .build()
2091            .unwrap();
2092
2093        // Collecting the stream should fail with the sparse column chunk offset
2094        // error we want to reproduce.
2095        let _result: Vec<_> = stream.try_collect().await.unwrap();
2096    }
2097
2098    #[tokio::test]
2099    async fn test_predicate_cache_disabled() {
2100        let k = Int32Array::from_iter_values(0..10);
2101        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2102
2103        let mut buf = Vec::new();
2104        // both the page row limit and batch size are set to 1 to create one page per row
2105        let props = WriterProperties::builder()
2106            .set_data_page_row_count_limit(1)
2107            .set_write_batch_size(1)
2108            .set_max_row_group_size(10)
2109            .set_write_page_header_statistics(true)
2110            .build();
2111        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2112        writer.write(&data).unwrap();
2113        writer.close().unwrap();
2114
2115        let data = Bytes::from(buf);
2116        let metadata = ParquetMetaDataReader::new()
2117            .with_page_index_policy(PageIndexPolicy::Required)
2118            .parse_and_finish(&data)
2119            .unwrap();
2120        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2121
2122        // the filter is not clone-able, so we use a lambda to simplify
2123        let build_filter = || {
2124            let scalar = Int32Array::from_iter_values([5]);
2125            let predicate = ArrowPredicateFn::new(
2126                ProjectionMask::leaves(&parquet_schema, vec![0]),
2127                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2128            );
2129            RowFilter::new(vec![Box::new(predicate)])
2130        };
2131
2132        // select only one of the pages
2133        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2134
2135        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2136        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2137
2138        // using the predicate cache (default)
2139        let reader_with_cache = TestReader::new(data.clone());
2140        let requests_with_cache = reader_with_cache.requests.clone();
2141        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2142            reader_with_cache,
2143            reader_metadata.clone(),
2144        )
2145        .with_batch_size(1000)
2146        .with_row_selection(selection.clone())
2147        .with_row_filter(build_filter())
2148        .build()
2149        .unwrap();
2150        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2151
2152        // disabling the predicate cache
2153        let reader_without_cache = TestReader::new(data);
2154        let requests_without_cache = reader_without_cache.requests.clone();
2155        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2156            reader_without_cache,
2157            reader_metadata,
2158        )
2159        .with_batch_size(1000)
2160        .with_row_selection(selection)
2161        .with_row_filter(build_filter())
2162        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
2163        .build()
2164        .unwrap();
2165        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2166
2167        assert_eq!(batches_with_cache, batches_without_cache);
2168
2169        let requests_with_cache = requests_with_cache.lock().unwrap();
2170        let requests_without_cache = requests_without_cache.lock().unwrap();
2171
2172        // less requests will be made without the predicate cache
2173        assert_eq!(requests_with_cache.len(), 11);
2174        assert_eq!(requests_without_cache.len(), 2);
2175
2176        // less bytes will be retrieved without the predicate cache
2177        assert_eq!(
2178            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2179            433
2180        );
2181        assert_eq!(
2182            requests_without_cache
2183                .iter()
2184                .map(|r| r.len())
2185                .sum::<usize>(),
2186            92
2187        );
2188    }
2189
2190    #[test]
2191    fn test_row_numbers_with_multiple_row_groups() {
2192        test_row_numbers_with_multiple_row_groups_helper(
2193            false,
2194            |path, selection, _row_filter, batch_size| {
2195                let runtime = tokio::runtime::Builder::new_current_thread()
2196                    .enable_all()
2197                    .build()
2198                    .expect("Could not create runtime");
2199                runtime.block_on(async move {
2200                    let file = tokio::fs::File::open(path).await.unwrap();
2201                    let row_number_field = Arc::new(
2202                        Field::new("row_number", DataType::Int64, false)
2203                            .with_extension_type(RowNumber),
2204                    );
2205                    let options =
2206                        ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
2207                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2208                        .await
2209                        .unwrap()
2210                        .with_row_selection(selection)
2211                        .with_batch_size(batch_size)
2212                        .build()
2213                        .expect("Could not create reader");
2214                    reader.try_collect::<Vec<_>>().await.unwrap()
2215                })
2216            },
2217        );
2218    }
2219
2220    #[test]
2221    fn test_row_numbers_with_multiple_row_groups_and_filter() {
2222        test_row_numbers_with_multiple_row_groups_helper(
2223            true,
2224            |path, selection, row_filter, batch_size| {
2225                let runtime = tokio::runtime::Builder::new_current_thread()
2226                    .enable_all()
2227                    .build()
2228                    .expect("Could not create runtime");
2229                runtime.block_on(async move {
2230                    let file = tokio::fs::File::open(path).await.unwrap();
2231                    let row_number_field = Arc::new(
2232                        Field::new("row_number", DataType::Int64, false)
2233                            .with_extension_type(RowNumber),
2234                    );
2235                    let options =
2236                        ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
2237                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2238                        .await
2239                        .unwrap()
2240                        .with_row_selection(selection)
2241                        .with_row_filter(row_filter.expect("No row filter"))
2242                        .with_batch_size(batch_size)
2243                        .build()
2244                        .expect("Could not create reader");
2245                    reader.try_collect::<Vec<_>>().await.unwrap()
2246                })
2247            },
2248        );
2249    }
2250}