parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
69///
70/// Notes:
71///
72/// 1. There is a default implementation for types that implement [`AsyncRead`]
73///    and [`AsyncSeek`], for example [`tokio::fs::File`].
74///
75/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
76///    is enabled, implements this interface for [`ObjectStore`].
77///
78/// [`ObjectStore`]: object_store::ObjectStore
79///
80/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
81pub trait AsyncFileReader: Send {
82    /// Retrieve the bytes in `range`
83    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
86    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87        async move {
88            let mut result = Vec::with_capacity(ranges.len());
89
90            for range in ranges.into_iter() {
91                let data = self.get_bytes(range).await?;
92                result.push(data);
93            }
94
95            Ok(result)
96        }
97        .boxed()
98    }
99
100    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
101    ///
102    /// This is an asynchronous operation as it may involve reading the file
103    /// footer and potentially other metadata from disk or a remote source.
104    ///
105    /// Reading data from Parquet requires the metadata to understand the
106    /// schema, row groups, and location of pages within the file. This metadata
107    /// is stored primarily in the footer of the Parquet file, and can be read using
108    /// [`ParquetMetaDataReader`].
109    ///
110    /// However, implementations can significantly speed up reading Parquet by
111    /// supplying cached metadata or pre-fetched metadata via this API.
112    ///
113    /// # Parameters
114    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
115    ///   and other options that affect how the metadata is read.
116    fn get_metadata<'a>(
117        &'a mut self,
118        options: Option<&'a ArrowReaderOptions>,
119    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
123impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125        self.as_mut().get_bytes(range)
126    }
127
128    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129        self.as_mut().get_byte_ranges(ranges)
130    }
131
132    fn get_metadata<'a>(
133        &'a mut self,
134        options: Option<&'a ArrowReaderOptions>,
135    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136        self.as_mut().get_metadata(options)
137    }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142        async move {
143            self.seek(SeekFrom::End(-(suffix as i64))).await?;
144            let mut buf = Vec::with_capacity(suffix);
145            self.take(suffix as _).read_to_end(&mut buf).await?;
146            Ok(buf.into())
147        }
148        .boxed()
149    }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154        async move {
155            self.seek(SeekFrom::Start(range.start)).await?;
156
157            let to_read = range.end - range.start;
158            let mut buffer = Vec::with_capacity(to_read.try_into()?);
159            let read = self.take(to_read).read_to_end(&mut buffer).await?;
160            if read as u64 != to_read {
161                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162            }
163
164            Ok(buffer.into())
165        }
166        .boxed()
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173        async move {
174            let metadata_reader = ParquetMetaDataReader::new()
175                .with_page_indexes(options.is_some_and(|o| o.page_index));
176
177            #[cfg(feature = "encryption")]
178            let metadata_reader = metadata_reader.with_decryption_properties(
179                options.and_then(|o| o.file_decryption_properties.as_ref()),
180            );
181
182            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183            Ok(Arc::new(parquet_metadata))
184        }
185        .boxed()
186    }
187}
188
189impl ArrowReaderMetadata {
190    /// Returns a new [`ArrowReaderMetadata`] for this builder
191    ///
192    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
193    pub async fn load_async<T: AsyncFileReader>(
194        input: &mut T,
195        options: ArrowReaderOptions,
196    ) -> Result<Self> {
197        let metadata = input.get_metadata(Some(&options)).await?;
198        Self::try_new(metadata, options)
199    }
200}
201
202#[doc(hidden)]
203/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
204///
205/// Allows sharing the same builder for both the sync and async versions, whilst also not
206/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
207pub struct AsyncReader<T>(T);
208
209/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
210///
211/// This can be used to decode a Parquet file in streaming fashion (without
212/// downloading the whole file at once) from a remote source, such as an object store.
213///
214/// This builder handles reading the parquet file metadata, allowing consumers
215/// to use this information to select what specific columns, row groups, etc.
216/// they wish to be read by the resulting stream.
217///
218/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
219///
220/// See [`ArrowReaderBuilder`] for additional member functions
221pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
225    /// specified source.
226    ///
227    /// # Example
228    /// ```
229    /// # #[tokio::main(flavor="current_thread")]
230    /// # async fn main() {
231    /// #
232    /// # use arrow_array::RecordBatch;
233    /// # use arrow::util::pretty::pretty_format_batches;
234    /// # use futures::TryStreamExt;
235    /// #
236    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
237    /// #
238    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
239    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
240    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
241    /// #     assert_eq!(
242    /// #          &actual_lines, expected_lines,
243    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
244    /// #          expected_lines, actual_lines
245    /// #      );
246    /// #  }
247    /// #
248    /// # let testdata = arrow::util::test_util::parquet_test_data();
249    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
250    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
251    /// // another async I/O reader such as a reader from an object store.
252    /// let file = tokio::fs::File::open(path).await.unwrap();
253    ///
254    /// // Configure options for reading from the async source
255    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
256    ///     .await
257    ///     .unwrap();
258    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
259    /// // a stream that can be used to incrementally read the data in batches
260    /// let stream = builder.build().unwrap();
261    /// // In this example, we collect the stream into a Vec<RecordBatch>
262    /// // but real applications would likely process the batches as they are read
263    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
264    /// // Demonstrate the results are as expected
265    /// assert_batches_eq(
266    ///     &results,
267    ///     &[
268    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
269    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
270    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
271    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
272    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
273    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
274    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
275    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
276    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
277    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
278    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
279    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
280    ///      ],
281    ///  );
282    /// # }
283    /// ```
284    ///
285    /// # Example configuring options and reading metadata
286    ///
287    /// There are many options that control the behavior of the reader, such as
288    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
289    ///
290    /// ```
291    /// # #[tokio::main(flavor="current_thread")]
292    /// # async fn main() {
293    /// #
294    /// # use arrow_array::RecordBatch;
295    /// # use arrow::util::pretty::pretty_format_batches;
296    /// # use futures::TryStreamExt;
297    /// #
298    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
299    /// #
300    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
301    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
302    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
303    /// #     assert_eq!(
304    /// #          &actual_lines, expected_lines,
305    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
306    /// #          expected_lines, actual_lines
307    /// #      );
308    /// #  }
309    /// #
310    /// # let testdata = arrow::util::test_util::parquet_test_data();
311    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
312    /// // As before, use tokio::fs::File to read data using an async I/O.
313    /// let file = tokio::fs::File::open(path).await.unwrap();
314    ///
315    /// // Configure options for reading from the async source, in this case we set the batch size
316    /// // to 3 which produces 3 rows at a time.
317    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
318    ///     .await
319    ///     .unwrap()
320    ///     .with_batch_size(3);
321    ///
322    /// // We can also read the metadata to inspect the schema and other metadata
323    /// // before actually reading the data
324    /// let file_metadata = builder.metadata().file_metadata();
325    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
326    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
327    ///
328    /// let stream = builder.with_projection(mask).build().unwrap();
329    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
330    /// // Print out the results
331    /// assert_batches_eq(
332    ///     &results,
333    ///     &[
334    ///         "+----------+-------------+-----------+",
335    ///         "| bool_col | tinyint_col | float_col |",
336    ///         "+----------+-------------+-----------+",
337    ///         "| true     | 0           | 0.0       |",
338    ///         "| false    | 1           | 1.1       |",
339    ///         "| true     | 0           | 0.0       |",
340    ///         "| false    | 1           | 1.1       |",
341    ///         "| true     | 0           | 0.0       |",
342    ///         "| false    | 1           | 1.1       |",
343    ///         "| true     | 0           | 0.0       |",
344    ///         "| false    | 1           | 1.1       |",
345    ///         "+----------+-------------+-----------+",
346    ///      ],
347    ///  );
348    ///
349    /// // The results has 8 rows, so since we set the batch size to 3, we expect
350    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
351    /// assert_eq!(results.len(), 3);
352    /// # }
353    /// ```
354    pub async fn new(input: T) -> Result<Self> {
355        Self::new_with_options(input, Default::default()).await
356    }
357
358    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
359    /// and [`ArrowReaderOptions`].
360    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362        Ok(Self::new_with_metadata(input, metadata))
363    }
364
365    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
366    ///
367    /// This allows loading metadata once and using it to create multiple builders with
368    /// potentially different settings, that can be read in parallel.
369    ///
370    /// # Example of reading from multiple streams in parallel
371    ///
372    /// ```
373    /// # use std::fs::metadata;
374    /// # use std::sync::Arc;
375    /// # use bytes::Bytes;
376    /// # use arrow_array::{Int32Array, RecordBatch};
377    /// # use arrow_schema::{DataType, Field, Schema};
378    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
379    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
380    /// # use tempfile::tempfile;
381    /// # use futures::StreamExt;
382    /// # #[tokio::main(flavor="current_thread")]
383    /// # async fn main() {
384    /// #
385    /// # let mut file = tempfile().unwrap();
386    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
387    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
388    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
389    /// # writer.write(&batch).unwrap();
390    /// # writer.close().unwrap();
391    /// // open file with parquet data
392    /// let mut file = tokio::fs::File::from_std(file);
393    /// // load metadata once
394    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
395    /// // create two readers, a and b, from the same underlying file
396    /// // without reading the metadata again
397    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
398    ///     file.try_clone().await.unwrap(),
399    ///     meta.clone()
400    /// ).build().unwrap();
401    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
402    ///
403    /// // Can read batches from both readers in parallel
404    /// assert_eq!(
405    ///   a.next().await.unwrap().unwrap(),
406    ///   b.next().await.unwrap().unwrap(),
407    /// );
408    /// # }
409    /// ```
410    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411        Self::new_builder(AsyncReader(input), metadata)
412    }
413
414    /// Read bloom filter for a column in a row group
415    ///
416    /// Returns `None` if the column does not have a bloom filter
417    ///
418    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
419    pub async fn get_row_group_column_bloom_filter(
420        &mut self,
421        row_group_idx: usize,
422        column_idx: usize,
423    ) -> Result<Option<Sbbf>> {
424        let metadata = self.metadata.row_group(row_group_idx);
425        let column_metadata = metadata.column(column_idx);
426
427        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428            offset
429                .try_into()
430                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431        } else {
432            return Ok(None);
433        };
434
435        let buffer = match column_metadata.bloom_filter_length() {
436            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437            None => self
438                .input
439                .0
440                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441        }
442        .await?;
443
444        let (header, bitset_offset) =
445            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447        match header.algorithm {
448            BloomFilterAlgorithm::BLOCK(_) => {
449                // this match exists to future proof the singleton algorithm enum
450            }
451        }
452        match header.compression {
453            BloomFilterCompression::UNCOMPRESSED(_) => {
454                // this match exists to future proof the singleton compression enum
455            }
456        }
457        match header.hash {
458            BloomFilterHash::XXHASH(_) => {
459                // this match exists to future proof the singleton hash enum
460            }
461        }
462
463        let bitset = match column_metadata.bloom_filter_length() {
464            Some(_) => buffer.slice(
465                (TryInto::<usize>::try_into(bitset_offset).unwrap()
466                    - TryInto::<usize>::try_into(offset).unwrap())..,
467            ),
468            None => {
469                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470                    ParquetError::General("Bloom filter length is invalid".to_string())
471                })?;
472                self.input
473                    .0
474                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
475                    .await?
476            }
477        };
478        Ok(Some(Sbbf::new(&bitset)))
479    }
480
481    /// Build a new [`ParquetRecordBatchStream`]
482    ///
483    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
484    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485        let num_row_groups = self.metadata.row_groups().len();
486
487        let row_groups = match self.row_groups {
488            Some(row_groups) => {
489                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
490                    return Err(general_err!(
491                        "row group {} out of bounds 0..{}",
492                        col,
493                        num_row_groups
494                    ));
495                }
496                row_groups.into()
497            }
498            None => (0..self.metadata.row_groups().len()).collect(),
499        };
500
501        // Try to avoid allocate large buffer
502        let batch_size = self
503            .batch_size
504            .min(self.metadata.file_metadata().num_rows() as usize);
505        let reader_factory = ReaderFactory {
506            input: self.input.0,
507            filter: self.filter,
508            metadata: self.metadata.clone(),
509            fields: self.fields,
510            limit: self.limit,
511            offset: self.offset,
512        };
513
514        // Ensure schema of ParquetRecordBatchStream respects projection, and does
515        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
516        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
517            Some(DataType::Struct(fields)) => {
518                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
519            }
520            None => Fields::empty(),
521            _ => unreachable!("Must be Struct for root type"),
522        };
523        let schema = Arc::new(Schema::new(projected_fields));
524
525        Ok(ParquetRecordBatchStream {
526            metadata: self.metadata,
527            batch_size,
528            row_groups,
529            projection: self.projection,
530            selection: self.selection,
531            schema,
532            reader_factory: Some(reader_factory),
533            state: StreamState::Init,
534        })
535    }
536}
537
538type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
539
540/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
541/// [`ParquetRecordBatchReader`]
542struct ReaderFactory<T> {
543    metadata: Arc<ParquetMetaData>,
544
545    fields: Option<Arc<ParquetField>>,
546
547    input: T,
548
549    filter: Option<RowFilter>,
550
551    limit: Option<usize>,
552
553    offset: Option<usize>,
554}
555
556impl<T> ReaderFactory<T>
557where
558    T: AsyncFileReader + Send,
559{
560    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
561    ///
562    /// Note: this captures self so that the resulting future has a static lifetime
563    async fn read_row_group(
564        mut self,
565        row_group_idx: usize,
566        mut selection: Option<RowSelection>,
567        projection: ProjectionMask,
568        batch_size: usize,
569    ) -> ReadResult<T> {
570        // TODO: calling build_array multiple times is wasteful
571
572        let meta = self.metadata.row_group(row_group_idx);
573        let offset_index = self
574            .metadata
575            .offset_index()
576            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
577            .filter(|index| !index.is_empty())
578            .map(|x| x[row_group_idx].as_slice());
579
580        let mut row_group = InMemoryRowGroup {
581            // schema: meta.schema_descr_ptr(),
582            row_count: meta.num_rows() as usize,
583            column_chunks: vec![None; meta.columns().len()],
584            offset_index,
585            row_group_idx,
586            metadata: self.metadata.as_ref(),
587        };
588
589        if let Some(filter) = self.filter.as_mut() {
590            for predicate in filter.predicates.iter_mut() {
591                if !selects_any(selection.as_ref()) {
592                    return Ok((self, None));
593                }
594
595                let predicate_projection = predicate.projection();
596                row_group
597                    .fetch(&mut self.input, predicate_projection, selection.as_ref())
598                    .await?;
599
600                let array_reader =
601                    build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
602
603                selection = Some(evaluate_predicate(
604                    batch_size,
605                    array_reader,
606                    selection,
607                    predicate.as_mut(),
608                )?);
609            }
610        }
611
612        // Compute the number of rows in the selection before applying limit and offset
613        let rows_before = selection
614            .as_ref()
615            .map(|s| s.row_count())
616            .unwrap_or(row_group.row_count);
617
618        if rows_before == 0 {
619            return Ok((self, None));
620        }
621
622        selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
623
624        // Compute the number of rows in the selection after applying limit and offset
625        let rows_after = selection
626            .as_ref()
627            .map(|s| s.row_count())
628            .unwrap_or(row_group.row_count);
629
630        // Update offset if necessary
631        if let Some(offset) = &mut self.offset {
632            // Reduction is either because of offset or limit, as limit is applied
633            // after offset has been "exhausted" can just use saturating sub here
634            *offset = offset.saturating_sub(rows_before - rows_after)
635        }
636
637        if rows_after == 0 {
638            return Ok((self, None));
639        }
640
641        if let Some(limit) = &mut self.limit {
642            *limit -= rows_after;
643        }
644
645        row_group
646            .fetch(&mut self.input, &projection, selection.as_ref())
647            .await?;
648
649        let reader = ParquetRecordBatchReader::new(
650            batch_size,
651            build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
652            selection,
653        );
654
655        Ok((self, Some(reader)))
656    }
657}
658
659enum StreamState<T> {
660    /// At the start of a new row group, or the end of the parquet stream
661    Init,
662    /// Decoding a batch
663    Decoding(ParquetRecordBatchReader),
664    /// Reading data from input
665    Reading(BoxFuture<'static, ReadResult<T>>),
666    /// Error
667    Error,
668}
669
670impl<T> std::fmt::Debug for StreamState<T> {
671    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
672        match self {
673            StreamState::Init => write!(f, "StreamState::Init"),
674            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
675            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
676            StreamState::Error => write!(f, "StreamState::Error"),
677        }
678    }
679}
680
681/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
682///
683/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
684/// allowing users to decode record batches separately from I/O.
685///
686/// # I/O Buffering
687///
688/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
689/// (projection + filtering, etc) and decodes the rows from those buffered pages.
690///
691/// For example, if all rows and columns are selected, the entire row group is
692/// buffered in memory during decode. This minimizes the number of IO operations
693/// required, which is especially important for object stores, where IO operations
694/// have latencies in the hundreds of milliseconds
695///
696///
697/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
698pub struct ParquetRecordBatchStream<T> {
699    metadata: Arc<ParquetMetaData>,
700
701    schema: SchemaRef,
702
703    row_groups: VecDeque<usize>,
704
705    projection: ProjectionMask,
706
707    batch_size: usize,
708
709    selection: Option<RowSelection>,
710
711    /// This is an option so it can be moved into a future
712    reader_factory: Option<ReaderFactory<T>>,
713
714    state: StreamState<T>,
715}
716
717impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
718    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
719        f.debug_struct("ParquetRecordBatchStream")
720            .field("metadata", &self.metadata)
721            .field("schema", &self.schema)
722            .field("batch_size", &self.batch_size)
723            .field("projection", &self.projection)
724            .field("state", &self.state)
725            .finish()
726    }
727}
728
729impl<T> ParquetRecordBatchStream<T> {
730    /// Returns the projected [`SchemaRef`] for reading the parquet file.
731    ///
732    /// Note that the schema metadata will be stripped here. See
733    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
734    pub fn schema(&self) -> &SchemaRef {
735        &self.schema
736    }
737}
738
739impl<T> ParquetRecordBatchStream<T>
740where
741    T: AsyncFileReader + Unpin + Send + 'static,
742{
743    /// Fetches the next row group from the stream.
744    ///
745    /// Users can continue to call this function to get row groups and decode them concurrently.
746    ///
747    /// ## Notes
748    ///
749    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
750    ///
751    /// ## Returns
752    ///
753    /// - `Ok(None)` if the stream has ended.
754    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
755    /// - `Ok(Some(reader))` which holds all the data for the row group.
756    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
757        loop {
758            match &mut self.state {
759                StreamState::Decoding(_) | StreamState::Reading(_) => {
760                    return Err(ParquetError::General(
761                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
762                    ))
763                }
764                StreamState::Init => {
765                    let row_group_idx = match self.row_groups.pop_front() {
766                        Some(idx) => idx,
767                        None => return Ok(None),
768                    };
769
770                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
771
772                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
773
774                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
775
776                    let (reader_factory, maybe_reader) = reader_factory
777                        .read_row_group(
778                            row_group_idx,
779                            selection,
780                            self.projection.clone(),
781                            self.batch_size,
782                        )
783                        .await
784                        .inspect_err(|_| {
785                            self.state = StreamState::Error;
786                        })?;
787                    self.reader_factory = Some(reader_factory);
788
789                    if let Some(reader) = maybe_reader {
790                        return Ok(Some(reader));
791                    } else {
792                        // All rows skipped, read next row group
793                        continue;
794                    }
795                }
796                StreamState::Error => return Ok(None), // Ends the stream as error happens.
797            }
798        }
799    }
800}
801
802impl<T> Stream for ParquetRecordBatchStream<T>
803where
804    T: AsyncFileReader + Unpin + Send + 'static,
805{
806    type Item = Result<RecordBatch>;
807
808    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
809        loop {
810            match &mut self.state {
811                StreamState::Decoding(batch_reader) => match batch_reader.next() {
812                    Some(Ok(batch)) => {
813                        return Poll::Ready(Some(Ok(batch)));
814                    }
815                    Some(Err(e)) => {
816                        self.state = StreamState::Error;
817                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
818                    }
819                    None => self.state = StreamState::Init,
820                },
821                StreamState::Init => {
822                    let row_group_idx = match self.row_groups.pop_front() {
823                        Some(idx) => idx,
824                        None => return Poll::Ready(None),
825                    };
826
827                    let reader = self.reader_factory.take().expect("lost reader factory");
828
829                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
830
831                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
832
833                    let fut = reader
834                        .read_row_group(
835                            row_group_idx,
836                            selection,
837                            self.projection.clone(),
838                            self.batch_size,
839                        )
840                        .boxed();
841
842                    self.state = StreamState::Reading(fut)
843                }
844                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
845                    Ok((reader_factory, maybe_reader)) => {
846                        self.reader_factory = Some(reader_factory);
847                        match maybe_reader {
848                            // Read records from [`ParquetRecordBatchReader`]
849                            Some(reader) => self.state = StreamState::Decoding(reader),
850                            // All rows skipped, read next row group
851                            None => self.state = StreamState::Init,
852                        }
853                    }
854                    Err(e) => {
855                        self.state = StreamState::Error;
856                        return Poll::Ready(Some(Err(e)));
857                    }
858                },
859                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
860            }
861        }
862    }
863}
864
865/// An in-memory collection of column chunks
866struct InMemoryRowGroup<'a> {
867    offset_index: Option<&'a [OffsetIndexMetaData]>,
868    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
869    row_count: usize,
870    row_group_idx: usize,
871    metadata: &'a ParquetMetaData,
872}
873
874impl InMemoryRowGroup<'_> {
875    /// Fetches the necessary column data into memory
876    async fn fetch<T: AsyncFileReader + Send>(
877        &mut self,
878        input: &mut T,
879        projection: &ProjectionMask,
880        selection: Option<&RowSelection>,
881    ) -> Result<()> {
882        let metadata = self.metadata.row_group(self.row_group_idx);
883        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
884            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
885            // `RowSelection`
886            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
887
888            let fetch_ranges = self
889                .column_chunks
890                .iter()
891                .zip(metadata.columns())
892                .enumerate()
893                .filter(|&(idx, (chunk, _chunk_meta))| {
894                    chunk.is_none() && projection.leaf_included(idx)
895                })
896                .flat_map(|(idx, (_chunk, chunk_meta))| {
897                    // If the first page does not start at the beginning of the column,
898                    // then we need to also fetch a dictionary page.
899                    let mut ranges: Vec<Range<u64>> = vec![];
900                    let (start, _len) = chunk_meta.byte_range();
901                    match offset_index[idx].page_locations.first() {
902                        Some(first) if first.offset as u64 != start => {
903                            ranges.push(start..first.offset as u64);
904                        }
905                        _ => (),
906                    }
907
908                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
909                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
910
911                    ranges
912                })
913                .collect();
914
915            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
916            let mut page_start_offsets = page_start_offsets.into_iter();
917
918            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
919                if chunk.is_some() || !projection.leaf_included(idx) {
920                    continue;
921                }
922
923                if let Some(offsets) = page_start_offsets.next() {
924                    let mut chunks = Vec::with_capacity(offsets.len());
925                    for _ in 0..offsets.len() {
926                        chunks.push(chunk_data.next().unwrap());
927                    }
928
929                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
930                        length: metadata.column(idx).byte_range().1 as usize,
931                        data: offsets
932                            .into_iter()
933                            .map(|x| x as usize)
934                            .zip(chunks.into_iter())
935                            .collect(),
936                    }))
937                }
938            }
939        } else {
940            let fetch_ranges = self
941                .column_chunks
942                .iter()
943                .enumerate()
944                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
945                .map(|(idx, _chunk)| {
946                    let column = metadata.column(idx);
947                    let (start, length) = column.byte_range();
948                    start..(start + length)
949                })
950                .collect();
951
952            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
953
954            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
955                if chunk.is_some() || !projection.leaf_included(idx) {
956                    continue;
957                }
958
959                if let Some(data) = chunk_data.next() {
960                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
961                        offset: metadata.column(idx).byte_range().0 as usize,
962                        data,
963                    }));
964                }
965            }
966        }
967
968        Ok(())
969    }
970}
971
972impl RowGroups for InMemoryRowGroup<'_> {
973    fn num_rows(&self) -> usize {
974        self.row_count
975    }
976
977    /// Return chunks for column i
978    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
979        match &self.column_chunks[i] {
980            None => Err(ParquetError::General(format!(
981                "Invalid column index {i}, column was not fetched"
982            ))),
983            Some(data) => {
984                let page_locations = self
985                    .offset_index
986                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
987                    .filter(|index| !index.is_empty())
988                    .map(|index| index[i].page_locations.clone());
989                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
990                let page_reader = SerializedPageReader::new(
991                    data.clone(),
992                    column_chunk_metadata,
993                    self.row_count,
994                    page_locations,
995                )?;
996                let page_reader = page_reader.add_crypto_context(
997                    self.row_group_idx,
998                    i,
999                    self.metadata,
1000                    column_chunk_metadata,
1001                )?;
1002
1003                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1004
1005                Ok(Box::new(ColumnChunkIterator {
1006                    reader: Some(Ok(page_reader)),
1007                }))
1008            }
1009        }
1010    }
1011}
1012
1013/// An in-memory column chunk
1014#[derive(Clone)]
1015enum ColumnChunkData {
1016    /// Column chunk data representing only a subset of data pages
1017    Sparse {
1018        /// Length of the full column chunk
1019        length: usize,
1020        /// Set of data pages included in this sparse chunk. Each element is a tuple
1021        /// of (page offset, page data)
1022        data: Vec<(usize, Bytes)>,
1023    },
1024    /// Full column chunk and its offset
1025    Dense { offset: usize, data: Bytes },
1026}
1027
1028impl ColumnChunkData {
1029    fn get(&self, start: u64) -> Result<Bytes> {
1030        match &self {
1031            ColumnChunkData::Sparse { data, .. } => data
1032                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1033                .map(|idx| data[idx].1.clone())
1034                .map_err(|_| {
1035                    ParquetError::General(format!(
1036                        "Invalid offset in sparse column chunk data: {start}"
1037                    ))
1038                }),
1039            ColumnChunkData::Dense { offset, data } => {
1040                let start = start as usize - *offset;
1041                Ok(data.slice(start..))
1042            }
1043        }
1044    }
1045}
1046
1047impl Length for ColumnChunkData {
1048    fn len(&self) -> u64 {
1049        match &self {
1050            ColumnChunkData::Sparse { length, .. } => *length as u64,
1051            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1052        }
1053    }
1054}
1055
1056impl ChunkReader for ColumnChunkData {
1057    type T = bytes::buf::Reader<Bytes>;
1058
1059    fn get_read(&self, start: u64) -> Result<Self::T> {
1060        Ok(self.get(start)?.reader())
1061    }
1062
1063    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1064        Ok(self.get(start)?.slice(..length))
1065    }
1066}
1067
1068/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1069struct ColumnChunkIterator {
1070    reader: Option<Result<Box<dyn PageReader>>>,
1071}
1072
1073impl Iterator for ColumnChunkIterator {
1074    type Item = Result<Box<dyn PageReader>>;
1075
1076    fn next(&mut self) -> Option<Self::Item> {
1077        self.reader.take()
1078    }
1079}
1080
1081impl PageIterator for ColumnChunkIterator {}
1082
1083#[cfg(test)]
1084mod tests {
1085    use super::*;
1086    use crate::arrow::arrow_reader::{
1087        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1088    };
1089    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1090    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1091    use crate::arrow::ArrowWriter;
1092    use crate::file::metadata::ParquetMetaDataReader;
1093    use crate::file::properties::WriterProperties;
1094    use arrow::compute::kernels::cmp::eq;
1095    use arrow::error::Result as ArrowResult;
1096    use arrow_array::builder::{ListBuilder, StringBuilder};
1097    use arrow_array::cast::AsArray;
1098    use arrow_array::types::Int32Type;
1099    use arrow_array::{
1100        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1101        StructArray, UInt64Array,
1102    };
1103    use arrow_schema::{DataType, Field, Schema};
1104    use futures::{StreamExt, TryStreamExt};
1105    use rand::{rng, Rng};
1106    use std::collections::HashMap;
1107    use std::sync::{Arc, Mutex};
1108    use tempfile::tempfile;
1109
1110    #[derive(Clone)]
1111    struct TestReader {
1112        data: Bytes,
1113        metadata: Option<Arc<ParquetMetaData>>,
1114        requests: Arc<Mutex<Vec<Range<usize>>>>,
1115    }
1116
1117    impl AsyncFileReader for TestReader {
1118        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1119            let range = range.clone();
1120            self.requests
1121                .lock()
1122                .unwrap()
1123                .push(range.start as usize..range.end as usize);
1124            futures::future::ready(Ok(self
1125                .data
1126                .slice(range.start as usize..range.end as usize)))
1127            .boxed()
1128        }
1129
1130        fn get_metadata<'a>(
1131            &'a mut self,
1132            options: Option<&'a ArrowReaderOptions>,
1133        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1134            let metadata_reader = ParquetMetaDataReader::new()
1135                .with_page_indexes(options.is_some_and(|o| o.page_index));
1136            self.metadata = Some(Arc::new(
1137                metadata_reader.parse_and_finish(&self.data).unwrap(),
1138            ));
1139            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1140        }
1141    }
1142
1143    #[tokio::test]
1144    async fn test_async_reader() {
1145        let testdata = arrow::util::test_util::parquet_test_data();
1146        let path = format!("{testdata}/alltypes_plain.parquet");
1147        let data = Bytes::from(std::fs::read(path).unwrap());
1148
1149        let async_reader = TestReader {
1150            data: data.clone(),
1151            metadata: Default::default(),
1152            requests: Default::default(),
1153        };
1154
1155        let requests = async_reader.requests.clone();
1156        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1157            .await
1158            .unwrap();
1159
1160        let metadata = builder.metadata().clone();
1161        assert_eq!(metadata.num_row_groups(), 1);
1162
1163        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1164        let stream = builder
1165            .with_projection(mask.clone())
1166            .with_batch_size(1024)
1167            .build()
1168            .unwrap();
1169
1170        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1171
1172        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1173            .unwrap()
1174            .with_projection(mask)
1175            .with_batch_size(104)
1176            .build()
1177            .unwrap()
1178            .collect::<ArrowResult<Vec<_>>>()
1179            .unwrap();
1180
1181        assert_eq!(async_batches, sync_batches);
1182
1183        let requests = requests.lock().unwrap();
1184        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1185        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1186
1187        assert_eq!(
1188            &requests[..],
1189            &[
1190                offset_1 as usize..(offset_1 + length_1) as usize,
1191                offset_2 as usize..(offset_2 + length_2) as usize
1192            ]
1193        );
1194    }
1195
1196    #[tokio::test]
1197    async fn test_async_reader_with_next_row_group() {
1198        let testdata = arrow::util::test_util::parquet_test_data();
1199        let path = format!("{testdata}/alltypes_plain.parquet");
1200        let data = Bytes::from(std::fs::read(path).unwrap());
1201
1202        let async_reader = TestReader {
1203            data: data.clone(),
1204            metadata: Default::default(),
1205            requests: Default::default(),
1206        };
1207
1208        let requests = async_reader.requests.clone();
1209        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1210            .await
1211            .unwrap();
1212
1213        let metadata = builder.metadata().clone();
1214        assert_eq!(metadata.num_row_groups(), 1);
1215
1216        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1217        let mut stream = builder
1218            .with_projection(mask.clone())
1219            .with_batch_size(1024)
1220            .build()
1221            .unwrap();
1222
1223        let mut readers = vec![];
1224        while let Some(reader) = stream.next_row_group().await.unwrap() {
1225            readers.push(reader);
1226        }
1227
1228        let async_batches: Vec<_> = readers
1229            .into_iter()
1230            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1231            .collect();
1232
1233        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1234            .unwrap()
1235            .with_projection(mask)
1236            .with_batch_size(104)
1237            .build()
1238            .unwrap()
1239            .collect::<ArrowResult<Vec<_>>>()
1240            .unwrap();
1241
1242        assert_eq!(async_batches, sync_batches);
1243
1244        let requests = requests.lock().unwrap();
1245        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1246        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1247
1248        assert_eq!(
1249            &requests[..],
1250            &[
1251                offset_1 as usize..(offset_1 + length_1) as usize,
1252                offset_2 as usize..(offset_2 + length_2) as usize
1253            ]
1254        );
1255    }
1256
1257    #[tokio::test]
1258    async fn test_async_reader_with_index() {
1259        let testdata = arrow::util::test_util::parquet_test_data();
1260        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1261        let data = Bytes::from(std::fs::read(path).unwrap());
1262
1263        let async_reader = TestReader {
1264            data: data.clone(),
1265            metadata: Default::default(),
1266            requests: Default::default(),
1267        };
1268
1269        let options = ArrowReaderOptions::new().with_page_index(true);
1270        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1271            .await
1272            .unwrap();
1273
1274        // The builder should have page and offset indexes loaded now
1275        let metadata_with_index = builder.metadata();
1276        assert_eq!(metadata_with_index.num_row_groups(), 1);
1277
1278        // Check offset indexes are present for all columns
1279        let offset_index = metadata_with_index.offset_index().unwrap();
1280        let column_index = metadata_with_index.column_index().unwrap();
1281
1282        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1283        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1284
1285        let num_columns = metadata_with_index
1286            .file_metadata()
1287            .schema_descr()
1288            .num_columns();
1289
1290        // Check page indexes are present for all columns
1291        offset_index
1292            .iter()
1293            .for_each(|x| assert_eq!(x.len(), num_columns));
1294        column_index
1295            .iter()
1296            .for_each(|x| assert_eq!(x.len(), num_columns));
1297
1298        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1299        let stream = builder
1300            .with_projection(mask.clone())
1301            .with_batch_size(1024)
1302            .build()
1303            .unwrap();
1304
1305        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1306
1307        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1308            .unwrap()
1309            .with_projection(mask)
1310            .with_batch_size(1024)
1311            .build()
1312            .unwrap()
1313            .collect::<ArrowResult<Vec<_>>>()
1314            .unwrap();
1315
1316        assert_eq!(async_batches, sync_batches);
1317    }
1318
1319    #[tokio::test]
1320    async fn test_async_reader_with_limit() {
1321        let testdata = arrow::util::test_util::parquet_test_data();
1322        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1323        let data = Bytes::from(std::fs::read(path).unwrap());
1324
1325        let metadata = ParquetMetaDataReader::new()
1326            .parse_and_finish(&data)
1327            .unwrap();
1328        let metadata = Arc::new(metadata);
1329
1330        assert_eq!(metadata.num_row_groups(), 1);
1331
1332        let async_reader = TestReader {
1333            data: data.clone(),
1334            metadata: Default::default(),
1335            requests: Default::default(),
1336        };
1337
1338        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1339            .await
1340            .unwrap();
1341
1342        assert_eq!(builder.metadata().num_row_groups(), 1);
1343
1344        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1345        let stream = builder
1346            .with_projection(mask.clone())
1347            .with_batch_size(1024)
1348            .with_limit(1)
1349            .build()
1350            .unwrap();
1351
1352        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1353
1354        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1355            .unwrap()
1356            .with_projection(mask)
1357            .with_batch_size(1024)
1358            .with_limit(1)
1359            .build()
1360            .unwrap()
1361            .collect::<ArrowResult<Vec<_>>>()
1362            .unwrap();
1363
1364        assert_eq!(async_batches, sync_batches);
1365    }
1366
1367    #[tokio::test]
1368    async fn test_async_reader_skip_pages() {
1369        let testdata = arrow::util::test_util::parquet_test_data();
1370        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1371        let data = Bytes::from(std::fs::read(path).unwrap());
1372
1373        let async_reader = TestReader {
1374            data: data.clone(),
1375            metadata: Default::default(),
1376            requests: Default::default(),
1377        };
1378
1379        let options = ArrowReaderOptions::new().with_page_index(true);
1380        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1381            .await
1382            .unwrap();
1383
1384        assert_eq!(builder.metadata().num_row_groups(), 1);
1385
1386        let selection = RowSelection::from(vec![
1387            RowSelector::skip(21),   // Skip first page
1388            RowSelector::select(21), // Select page to boundary
1389            RowSelector::skip(41),   // Skip multiple pages
1390            RowSelector::select(41), // Select multiple pages
1391            RowSelector::skip(25),   // Skip page across boundary
1392            RowSelector::select(25), // Select across page boundary
1393            RowSelector::skip(7116), // Skip to final page boundary
1394            RowSelector::select(10), // Select final page
1395        ]);
1396
1397        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1398
1399        let stream = builder
1400            .with_projection(mask.clone())
1401            .with_row_selection(selection.clone())
1402            .build()
1403            .expect("building stream");
1404
1405        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1406
1407        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1408            .unwrap()
1409            .with_projection(mask)
1410            .with_batch_size(1024)
1411            .with_row_selection(selection)
1412            .build()
1413            .unwrap()
1414            .collect::<ArrowResult<Vec<_>>>()
1415            .unwrap();
1416
1417        assert_eq!(async_batches, sync_batches);
1418    }
1419
1420    #[tokio::test]
1421    async fn test_fuzz_async_reader_selection() {
1422        let testdata = arrow::util::test_util::parquet_test_data();
1423        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1424        let data = Bytes::from(std::fs::read(path).unwrap());
1425
1426        let mut rand = rng();
1427
1428        for _ in 0..100 {
1429            let mut expected_rows = 0;
1430            let mut total_rows = 0;
1431            let mut skip = false;
1432            let mut selectors = vec![];
1433
1434            while total_rows < 7300 {
1435                let row_count: usize = rand.random_range(1..100);
1436
1437                let row_count = row_count.min(7300 - total_rows);
1438
1439                selectors.push(RowSelector { row_count, skip });
1440
1441                total_rows += row_count;
1442                if !skip {
1443                    expected_rows += row_count;
1444                }
1445
1446                skip = !skip;
1447            }
1448
1449            let selection = RowSelection::from(selectors);
1450
1451            let async_reader = TestReader {
1452                data: data.clone(),
1453                metadata: Default::default(),
1454                requests: Default::default(),
1455            };
1456
1457            let options = ArrowReaderOptions::new().with_page_index(true);
1458            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1459                .await
1460                .unwrap();
1461
1462            assert_eq!(builder.metadata().num_row_groups(), 1);
1463
1464            let col_idx: usize = rand.random_range(0..13);
1465            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1466
1467            let stream = builder
1468                .with_projection(mask.clone())
1469                .with_row_selection(selection.clone())
1470                .build()
1471                .expect("building stream");
1472
1473            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1474
1475            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1476
1477            assert_eq!(actual_rows, expected_rows);
1478        }
1479    }
1480
1481    #[tokio::test]
1482    async fn test_async_reader_zero_row_selector() {
1483        //See https://github.com/apache/arrow-rs/issues/2669
1484        let testdata = arrow::util::test_util::parquet_test_data();
1485        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1486        let data = Bytes::from(std::fs::read(path).unwrap());
1487
1488        let mut rand = rng();
1489
1490        let mut expected_rows = 0;
1491        let mut total_rows = 0;
1492        let mut skip = false;
1493        let mut selectors = vec![];
1494
1495        selectors.push(RowSelector {
1496            row_count: 0,
1497            skip: false,
1498        });
1499
1500        while total_rows < 7300 {
1501            let row_count: usize = rand.random_range(1..100);
1502
1503            let row_count = row_count.min(7300 - total_rows);
1504
1505            selectors.push(RowSelector { row_count, skip });
1506
1507            total_rows += row_count;
1508            if !skip {
1509                expected_rows += row_count;
1510            }
1511
1512            skip = !skip;
1513        }
1514
1515        let selection = RowSelection::from(selectors);
1516
1517        let async_reader = TestReader {
1518            data: data.clone(),
1519            metadata: Default::default(),
1520            requests: Default::default(),
1521        };
1522
1523        let options = ArrowReaderOptions::new().with_page_index(true);
1524        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1525            .await
1526            .unwrap();
1527
1528        assert_eq!(builder.metadata().num_row_groups(), 1);
1529
1530        let col_idx: usize = rand.random_range(0..13);
1531        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1532
1533        let stream = builder
1534            .with_projection(mask.clone())
1535            .with_row_selection(selection.clone())
1536            .build()
1537            .expect("building stream");
1538
1539        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1540
1541        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1542
1543        assert_eq!(actual_rows, expected_rows);
1544    }
1545
1546    #[tokio::test]
1547    async fn test_row_filter() {
1548        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1549        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1550        let c = Int32Array::from_iter(0..6);
1551        let data = RecordBatch::try_from_iter([
1552            ("a", Arc::new(a) as ArrayRef),
1553            ("b", Arc::new(b) as ArrayRef),
1554            ("c", Arc::new(c) as ArrayRef),
1555        ])
1556        .unwrap();
1557
1558        let mut buf = Vec::with_capacity(1024);
1559        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1560        writer.write(&data).unwrap();
1561        writer.close().unwrap();
1562
1563        let data: Bytes = buf.into();
1564        let metadata = ParquetMetaDataReader::new()
1565            .parse_and_finish(&data)
1566            .unwrap();
1567        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1568
1569        let test = TestReader {
1570            data,
1571            metadata: Default::default(),
1572            requests: Default::default(),
1573        };
1574        let requests = test.requests.clone();
1575
1576        let a_scalar = StringArray::from_iter_values(["b"]);
1577        let a_filter = ArrowPredicateFn::new(
1578            ProjectionMask::leaves(&parquet_schema, vec![0]),
1579            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1580        );
1581
1582        let b_scalar = StringArray::from_iter_values(["4"]);
1583        let b_filter = ArrowPredicateFn::new(
1584            ProjectionMask::leaves(&parquet_schema, vec![1]),
1585            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1586        );
1587
1588        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1589
1590        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1591        let stream = ParquetRecordBatchStreamBuilder::new(test)
1592            .await
1593            .unwrap()
1594            .with_projection(mask.clone())
1595            .with_batch_size(1024)
1596            .with_row_filter(filter)
1597            .build()
1598            .unwrap();
1599
1600        let batches: Vec<_> = stream.try_collect().await.unwrap();
1601        assert_eq!(batches.len(), 1);
1602
1603        let batch = &batches[0];
1604        assert_eq!(batch.num_rows(), 1);
1605        assert_eq!(batch.num_columns(), 2);
1606
1607        let col = batch.column(0);
1608        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1609        assert_eq!(val, "b");
1610
1611        let col = batch.column(1);
1612        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1613        assert_eq!(val, 3);
1614
1615        // Should only have made 3 requests
1616        assert_eq!(requests.lock().unwrap().len(), 3);
1617    }
1618
1619    #[tokio::test]
1620    async fn test_limit_multiple_row_groups() {
1621        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1622        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1623        let c = Int32Array::from_iter(0..6);
1624        let data = RecordBatch::try_from_iter([
1625            ("a", Arc::new(a) as ArrayRef),
1626            ("b", Arc::new(b) as ArrayRef),
1627            ("c", Arc::new(c) as ArrayRef),
1628        ])
1629        .unwrap();
1630
1631        let mut buf = Vec::with_capacity(1024);
1632        let props = WriterProperties::builder()
1633            .set_max_row_group_size(3)
1634            .build();
1635        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1636        writer.write(&data).unwrap();
1637        writer.close().unwrap();
1638
1639        let data: Bytes = buf.into();
1640        let metadata = ParquetMetaDataReader::new()
1641            .parse_and_finish(&data)
1642            .unwrap();
1643
1644        assert_eq!(metadata.num_row_groups(), 2);
1645
1646        let test = TestReader {
1647            data,
1648            metadata: Default::default(),
1649            requests: Default::default(),
1650        };
1651
1652        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1653            .await
1654            .unwrap()
1655            .with_batch_size(1024)
1656            .with_limit(4)
1657            .build()
1658            .unwrap();
1659
1660        let batches: Vec<_> = stream.try_collect().await.unwrap();
1661        // Expect one batch for each row group
1662        assert_eq!(batches.len(), 2);
1663
1664        let batch = &batches[0];
1665        // First batch should contain all rows
1666        assert_eq!(batch.num_rows(), 3);
1667        assert_eq!(batch.num_columns(), 3);
1668        let col2 = batch.column(2).as_primitive::<Int32Type>();
1669        assert_eq!(col2.values(), &[0, 1, 2]);
1670
1671        let batch = &batches[1];
1672        // Second batch should trigger the limit and only have one row
1673        assert_eq!(batch.num_rows(), 1);
1674        assert_eq!(batch.num_columns(), 3);
1675        let col2 = batch.column(2).as_primitive::<Int32Type>();
1676        assert_eq!(col2.values(), &[3]);
1677
1678        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1679            .await
1680            .unwrap()
1681            .with_offset(2)
1682            .with_limit(3)
1683            .build()
1684            .unwrap();
1685
1686        let batches: Vec<_> = stream.try_collect().await.unwrap();
1687        // Expect one batch for each row group
1688        assert_eq!(batches.len(), 2);
1689
1690        let batch = &batches[0];
1691        // First batch should contain one row
1692        assert_eq!(batch.num_rows(), 1);
1693        assert_eq!(batch.num_columns(), 3);
1694        let col2 = batch.column(2).as_primitive::<Int32Type>();
1695        assert_eq!(col2.values(), &[2]);
1696
1697        let batch = &batches[1];
1698        // Second batch should contain two rows
1699        assert_eq!(batch.num_rows(), 2);
1700        assert_eq!(batch.num_columns(), 3);
1701        let col2 = batch.column(2).as_primitive::<Int32Type>();
1702        assert_eq!(col2.values(), &[3, 4]);
1703
1704        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1705            .await
1706            .unwrap()
1707            .with_offset(4)
1708            .with_limit(20)
1709            .build()
1710            .unwrap();
1711
1712        let batches: Vec<_> = stream.try_collect().await.unwrap();
1713        // Should skip first row group
1714        assert_eq!(batches.len(), 1);
1715
1716        let batch = &batches[0];
1717        // First batch should contain two rows
1718        assert_eq!(batch.num_rows(), 2);
1719        assert_eq!(batch.num_columns(), 3);
1720        let col2 = batch.column(2).as_primitive::<Int32Type>();
1721        assert_eq!(col2.values(), &[4, 5]);
1722    }
1723
1724    #[tokio::test]
1725    async fn test_row_filter_with_index() {
1726        let testdata = arrow::util::test_util::parquet_test_data();
1727        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1728        let data = Bytes::from(std::fs::read(path).unwrap());
1729
1730        let metadata = ParquetMetaDataReader::new()
1731            .parse_and_finish(&data)
1732            .unwrap();
1733        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1734
1735        assert_eq!(metadata.num_row_groups(), 1);
1736
1737        let async_reader = TestReader {
1738            data: data.clone(),
1739            metadata: Default::default(),
1740            requests: Default::default(),
1741        };
1742
1743        let a_filter =
1744            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1745                Ok(batch.column(0).as_boolean().clone())
1746            });
1747
1748        let b_scalar = Int8Array::from(vec![2]);
1749        let b_filter = ArrowPredicateFn::new(
1750            ProjectionMask::leaves(&parquet_schema, vec![2]),
1751            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1752        );
1753
1754        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1755
1756        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1757
1758        let options = ArrowReaderOptions::new().with_page_index(true);
1759        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1760            .await
1761            .unwrap()
1762            .with_projection(mask.clone())
1763            .with_batch_size(1024)
1764            .with_row_filter(filter)
1765            .build()
1766            .unwrap();
1767
1768        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1769
1770        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1771
1772        assert_eq!(total_rows, 730);
1773    }
1774
1775    #[tokio::test]
1776    async fn test_in_memory_row_group_sparse() {
1777        let testdata = arrow::util::test_util::parquet_test_data();
1778        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1779        let data = Bytes::from(std::fs::read(path).unwrap());
1780
1781        let metadata = ParquetMetaDataReader::new()
1782            .with_page_indexes(true)
1783            .parse_and_finish(&data)
1784            .unwrap();
1785
1786        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1787
1788        let mut metadata_builder = metadata.into_builder();
1789        let mut row_groups = metadata_builder.take_row_groups();
1790        row_groups.truncate(1);
1791        let row_group_meta = row_groups.pop().unwrap();
1792
1793        let metadata = metadata_builder
1794            .add_row_group(row_group_meta)
1795            .set_column_index(None)
1796            .set_offset_index(Some(vec![offset_index.clone()]))
1797            .build();
1798
1799        let metadata = Arc::new(metadata);
1800
1801        let num_rows = metadata.row_group(0).num_rows();
1802
1803        assert_eq!(metadata.num_row_groups(), 1);
1804
1805        let async_reader = TestReader {
1806            data: data.clone(),
1807            metadata: Default::default(),
1808            requests: Default::default(),
1809        };
1810
1811        let requests = async_reader.requests.clone();
1812        let (_, fields) = parquet_to_arrow_schema_and_fields(
1813            metadata.file_metadata().schema_descr(),
1814            ProjectionMask::all(),
1815            None,
1816        )
1817        .unwrap();
1818
1819        let _schema_desc = metadata.file_metadata().schema_descr();
1820
1821        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1822
1823        let reader_factory = ReaderFactory {
1824            metadata,
1825            fields: fields.map(Arc::new),
1826            input: async_reader,
1827            filter: None,
1828            limit: None,
1829            offset: None,
1830        };
1831
1832        let mut skip = true;
1833        let mut pages = offset_index[0].page_locations.iter().peekable();
1834
1835        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1836        let mut selectors = vec![];
1837        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1838        while let Some(page) = pages.next() {
1839            let num_rows = if let Some(next_page) = pages.peek() {
1840                next_page.first_row_index - page.first_row_index
1841            } else {
1842                num_rows - page.first_row_index
1843            };
1844
1845            if skip {
1846                selectors.push(RowSelector::skip(num_rows as usize));
1847            } else {
1848                selectors.push(RowSelector::select(num_rows as usize));
1849                let start = page.offset as usize;
1850                let end = start + page.compressed_page_size as usize;
1851                expected_page_requests.push(start..end);
1852            }
1853            skip = !skip;
1854        }
1855
1856        let selection = RowSelection::from(selectors);
1857
1858        let (_factory, _reader) = reader_factory
1859            .read_row_group(0, Some(selection), projection.clone(), 48)
1860            .await
1861            .expect("reading row group");
1862
1863        let requests = requests.lock().unwrap();
1864
1865        assert_eq!(&requests[..], &expected_page_requests)
1866    }
1867
1868    #[tokio::test]
1869    async fn test_batch_size_overallocate() {
1870        let testdata = arrow::util::test_util::parquet_test_data();
1871        // `alltypes_plain.parquet` only have 8 rows
1872        let path = format!("{testdata}/alltypes_plain.parquet");
1873        let data = Bytes::from(std::fs::read(path).unwrap());
1874
1875        let async_reader = TestReader {
1876            data: data.clone(),
1877            metadata: Default::default(),
1878            requests: Default::default(),
1879        };
1880
1881        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1882            .await
1883            .unwrap();
1884
1885        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1886
1887        let stream = builder
1888            .with_projection(ProjectionMask::all())
1889            .with_batch_size(1024)
1890            .build()
1891            .unwrap();
1892        assert_ne!(1024, file_rows);
1893        assert_eq!(stream.batch_size, file_rows);
1894    }
1895
1896    #[tokio::test]
1897    async fn test_get_row_group_column_bloom_filter_without_length() {
1898        let testdata = arrow::util::test_util::parquet_test_data();
1899        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1900        let data = Bytes::from(std::fs::read(path).unwrap());
1901        test_get_row_group_column_bloom_filter(data, false).await;
1902    }
1903
1904    #[tokio::test]
1905    async fn test_parquet_record_batch_stream_schema() {
1906        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1907            schema.flattened_fields().iter().map(|f| f.name()).collect()
1908        }
1909
1910        // ParquetRecordBatchReaderBuilder::schema differs from
1911        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1912        // schema contents (in terms of custom metadata attached to schema, and fields
1913        // returned). Test to ensure this remains consistent behaviour.
1914        //
1915        // Ensure same for asynchronous versions of the above.
1916
1917        // Prep data, for a schema with nested fields, with custom metadata
1918        let mut metadata = HashMap::with_capacity(1);
1919        metadata.insert("key".to_string(), "value".to_string());
1920
1921        let nested_struct_array = StructArray::from(vec![
1922            (
1923                Arc::new(Field::new("d", DataType::Utf8, true)),
1924                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1925            ),
1926            (
1927                Arc::new(Field::new("e", DataType::Utf8, true)),
1928                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1929            ),
1930        ]);
1931        let struct_array = StructArray::from(vec![
1932            (
1933                Arc::new(Field::new("a", DataType::Int32, true)),
1934                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1935            ),
1936            (
1937                Arc::new(Field::new("b", DataType::UInt64, true)),
1938                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1939            ),
1940            (
1941                Arc::new(Field::new(
1942                    "c",
1943                    nested_struct_array.data_type().clone(),
1944                    true,
1945                )),
1946                Arc::new(nested_struct_array) as ArrayRef,
1947            ),
1948        ]);
1949
1950        let schema =
1951            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1952        let record_batch = RecordBatch::from(struct_array)
1953            .with_schema(schema.clone())
1954            .unwrap();
1955
1956        // Write parquet with custom metadata in schema
1957        let mut file = tempfile().unwrap();
1958        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1959        writer.write(&record_batch).unwrap();
1960        writer.close().unwrap();
1961
1962        let all_fields = ["a", "b", "c", "d", "e"];
1963        // (leaf indices in mask, expected names in output schema all fields)
1964        let projections = [
1965            (vec![], vec![]),
1966            (vec![0], vec!["a"]),
1967            (vec![0, 1], vec!["a", "b"]),
1968            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1969            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1970        ];
1971
1972        // Ensure we're consistent for each of these projections
1973        for (indices, expected_projected_names) in projections {
1974            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1975                // Builder schema should preserve all fields and metadata
1976                assert_eq!(get_all_field_names(&builder), all_fields);
1977                assert_eq!(builder.metadata, metadata);
1978                // Reader & batch schema should show only projected fields, and no metadata
1979                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1980                assert_eq!(reader.metadata, HashMap::default());
1981                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1982                assert_eq!(batch.metadata, HashMap::default());
1983            };
1984
1985            let builder =
1986                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1987            let sync_builder_schema = builder.schema().clone();
1988            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1989            let mut reader = builder.with_projection(mask).build().unwrap();
1990            let sync_reader_schema = reader.schema();
1991            let batch = reader.next().unwrap().unwrap();
1992            let sync_batch_schema = batch.schema();
1993            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1994
1995            // asynchronous should be same
1996            let file = tokio::fs::File::from(file.try_clone().unwrap());
1997            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1998            let async_builder_schema = builder.schema().clone();
1999            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2000            let mut reader = builder.with_projection(mask).build().unwrap();
2001            let async_reader_schema = reader.schema().clone();
2002            let batch = reader.next().await.unwrap().unwrap();
2003            let async_batch_schema = batch.schema();
2004            assert_schemas(
2005                async_builder_schema,
2006                async_reader_schema,
2007                async_batch_schema,
2008            );
2009        }
2010    }
2011
2012    #[tokio::test]
2013    async fn test_get_row_group_column_bloom_filter_with_length() {
2014        // convert to new parquet file with bloom_filter_length
2015        let testdata = arrow::util::test_util::parquet_test_data();
2016        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2017        let data = Bytes::from(std::fs::read(path).unwrap());
2018        let async_reader = TestReader {
2019            data: data.clone(),
2020            metadata: Default::default(),
2021            requests: Default::default(),
2022        };
2023        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2024            .await
2025            .unwrap();
2026        let schema = builder.schema().clone();
2027        let stream = builder.build().unwrap();
2028        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2029
2030        let mut parquet_data = Vec::new();
2031        let props = WriterProperties::builder()
2032            .set_bloom_filter_enabled(true)
2033            .build();
2034        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2035        for batch in batches {
2036            writer.write(&batch).unwrap();
2037        }
2038        writer.close().unwrap();
2039
2040        // test the new parquet file
2041        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2042    }
2043
2044    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2045        let async_reader = TestReader {
2046            data: data.clone(),
2047            metadata: Default::default(),
2048            requests: Default::default(),
2049        };
2050
2051        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2052            .await
2053            .unwrap();
2054
2055        let metadata = builder.metadata();
2056        assert_eq!(metadata.num_row_groups(), 1);
2057        let row_group = metadata.row_group(0);
2058        let column = row_group.column(0);
2059        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2060
2061        let sbbf = builder
2062            .get_row_group_column_bloom_filter(0, 0)
2063            .await
2064            .unwrap()
2065            .unwrap();
2066        assert!(sbbf.check(&"Hello"));
2067        assert!(!sbbf.check(&"Hello_Not_Exists"));
2068    }
2069
2070    #[tokio::test]
2071    async fn test_nested_skip() {
2072        let schema = Arc::new(Schema::new(vec![
2073            Field::new("col_1", DataType::UInt64, false),
2074            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2075        ]));
2076
2077        // Default writer properties
2078        let props = WriterProperties::builder()
2079            .set_data_page_row_count_limit(256)
2080            .set_write_batch_size(256)
2081            .set_max_row_group_size(1024);
2082
2083        // Write data
2084        let mut file = tempfile().unwrap();
2085        let mut writer =
2086            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2087
2088        let mut builder = ListBuilder::new(StringBuilder::new());
2089        for id in 0..1024 {
2090            match id % 3 {
2091                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2092                1 => builder.append_value([Some(format!("id_{id}"))]),
2093                _ => builder.append_null(),
2094            }
2095        }
2096        let refs = vec![
2097            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2098            Arc::new(builder.finish()) as ArrayRef,
2099        ];
2100
2101        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2102        writer.write(&batch).unwrap();
2103        writer.close().unwrap();
2104
2105        let selections = [
2106            RowSelection::from(vec![
2107                RowSelector::skip(313),
2108                RowSelector::select(1),
2109                RowSelector::skip(709),
2110                RowSelector::select(1),
2111            ]),
2112            RowSelection::from(vec![
2113                RowSelector::skip(255),
2114                RowSelector::select(1),
2115                RowSelector::skip(767),
2116                RowSelector::select(1),
2117            ]),
2118            RowSelection::from(vec![
2119                RowSelector::select(255),
2120                RowSelector::skip(1),
2121                RowSelector::select(767),
2122                RowSelector::skip(1),
2123            ]),
2124            RowSelection::from(vec![
2125                RowSelector::skip(254),
2126                RowSelector::select(1),
2127                RowSelector::select(1),
2128                RowSelector::skip(767),
2129                RowSelector::select(1),
2130            ]),
2131        ];
2132
2133        for selection in selections {
2134            let expected = selection.row_count();
2135            // Read data
2136            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2137                tokio::fs::File::from_std(file.try_clone().unwrap()),
2138                ArrowReaderOptions::new().with_page_index(true),
2139            )
2140            .await
2141            .unwrap();
2142
2143            reader = reader.with_row_selection(selection);
2144
2145            let mut stream = reader.build().unwrap();
2146
2147            let mut total_rows = 0;
2148            while let Some(rb) = stream.next().await {
2149                let rb = rb.unwrap();
2150                total_rows += rb.num_rows();
2151            }
2152            assert_eq!(total_rows, expected);
2153        }
2154    }
2155
2156    #[tokio::test]
2157    async fn test_row_filter_nested() {
2158        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2159        let b = StructArray::from(vec![
2160            (
2161                Arc::new(Field::new("aa", DataType::Utf8, true)),
2162                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2163            ),
2164            (
2165                Arc::new(Field::new("bb", DataType::Utf8, true)),
2166                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2167            ),
2168        ]);
2169        let c = Int32Array::from_iter(0..6);
2170        let data = RecordBatch::try_from_iter([
2171            ("a", Arc::new(a) as ArrayRef),
2172            ("b", Arc::new(b) as ArrayRef),
2173            ("c", Arc::new(c) as ArrayRef),
2174        ])
2175        .unwrap();
2176
2177        let mut buf = Vec::with_capacity(1024);
2178        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2179        writer.write(&data).unwrap();
2180        writer.close().unwrap();
2181
2182        let data: Bytes = buf.into();
2183        let metadata = ParquetMetaDataReader::new()
2184            .parse_and_finish(&data)
2185            .unwrap();
2186        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2187
2188        let test = TestReader {
2189            data,
2190            metadata: Default::default(),
2191            requests: Default::default(),
2192        };
2193        let requests = test.requests.clone();
2194
2195        let a_scalar = StringArray::from_iter_values(["b"]);
2196        let a_filter = ArrowPredicateFn::new(
2197            ProjectionMask::leaves(&parquet_schema, vec![0]),
2198            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2199        );
2200
2201        let b_scalar = StringArray::from_iter_values(["4"]);
2202        let b_filter = ArrowPredicateFn::new(
2203            ProjectionMask::leaves(&parquet_schema, vec![2]),
2204            move |batch| {
2205                // Filter on the second element of the struct.
2206                let struct_array = batch
2207                    .column(0)
2208                    .as_any()
2209                    .downcast_ref::<StructArray>()
2210                    .unwrap();
2211                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2212            },
2213        );
2214
2215        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2216
2217        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2218        let stream = ParquetRecordBatchStreamBuilder::new(test)
2219            .await
2220            .unwrap()
2221            .with_projection(mask.clone())
2222            .with_batch_size(1024)
2223            .with_row_filter(filter)
2224            .build()
2225            .unwrap();
2226
2227        let batches: Vec<_> = stream.try_collect().await.unwrap();
2228        assert_eq!(batches.len(), 1);
2229
2230        let batch = &batches[0];
2231        assert_eq!(batch.num_rows(), 1);
2232        assert_eq!(batch.num_columns(), 2);
2233
2234        let col = batch.column(0);
2235        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2236        assert_eq!(val, "b");
2237
2238        let col = batch.column(1);
2239        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2240        assert_eq!(val, 3);
2241
2242        // Should only have made 3 requests
2243        assert_eq!(requests.lock().unwrap().len(), 3);
2244    }
2245
2246    #[tokio::test]
2247    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2248        use tokio::fs::File;
2249        let testdata = arrow::util::test_util::parquet_test_data();
2250        let path = format!("{testdata}/alltypes_plain.parquet");
2251        let mut file = File::open(&path).await.unwrap();
2252        let file_size = file.metadata().await.unwrap().len();
2253        let mut metadata = ParquetMetaDataReader::new()
2254            .with_page_indexes(true)
2255            .load_and_finish(&mut file, file_size)
2256            .await
2257            .unwrap();
2258
2259        metadata.set_offset_index(Some(vec![]));
2260        let options = ArrowReaderOptions::new().with_page_index(true);
2261        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2262        let reader =
2263            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2264                .build()
2265                .unwrap();
2266
2267        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2268        assert_eq!(result.len(), 1);
2269    }
2270
2271    #[tokio::test]
2272    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2273        use tokio::fs::File;
2274        let testdata = arrow::util::test_util::parquet_test_data();
2275        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2276        let mut file = File::open(&path).await.unwrap();
2277        let file_size = file.metadata().await.unwrap().len();
2278        let metadata = ParquetMetaDataReader::new()
2279            .with_page_indexes(true)
2280            .load_and_finish(&mut file, file_size)
2281            .await
2282            .unwrap();
2283
2284        let options = ArrowReaderOptions::new().with_page_index(true);
2285        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2286        let reader =
2287            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2288                .build()
2289                .unwrap();
2290
2291        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2292        assert_eq!(result.len(), 8);
2293    }
2294
2295    #[tokio::test]
2296    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2297        use tempfile::TempDir;
2298        use tokio::fs::File;
2299        fn write_metadata_to_local_file(
2300            metadata: ParquetMetaData,
2301            file: impl AsRef<std::path::Path>,
2302        ) {
2303            use crate::file::metadata::ParquetMetaDataWriter;
2304            use std::fs::File;
2305            let file = File::create(file).unwrap();
2306            ParquetMetaDataWriter::new(file, &metadata)
2307                .finish()
2308                .unwrap()
2309        }
2310
2311        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2312            use std::fs::File;
2313            let file = File::open(file).unwrap();
2314            ParquetMetaDataReader::new()
2315                .with_page_indexes(true)
2316                .parse_and_finish(&file)
2317                .unwrap()
2318        }
2319
2320        let testdata = arrow::util::test_util::parquet_test_data();
2321        let path = format!("{testdata}/alltypes_plain.parquet");
2322        let mut file = File::open(&path).await.unwrap();
2323        let file_size = file.metadata().await.unwrap().len();
2324        let metadata = ParquetMetaDataReader::new()
2325            .with_page_indexes(true)
2326            .load_and_finish(&mut file, file_size)
2327            .await
2328            .unwrap();
2329
2330        let tempdir = TempDir::new().unwrap();
2331        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2332        write_metadata_to_local_file(metadata, &metadata_path);
2333        let metadata = read_metadata_from_local_file(&metadata_path);
2334
2335        let options = ArrowReaderOptions::new().with_page_index(true);
2336        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2337        let reader =
2338            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2339                .build()
2340                .unwrap();
2341
2342        // Panics here
2343        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2344        assert_eq!(result.len(), 1);
2345    }
2346}