parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::file::FOOTER_SIZE;
57use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
58
59mod metadata;
60pub use metadata::*;
61
62#[cfg(feature = "object_store")]
63mod store;
64
65use crate::arrow::schema::ParquetField;
66#[cfg(feature = "object_store")]
67pub use store::*;
68
69/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
70///
71/// Notes:
72///
73/// 1. There is a default implementation for types that implement [`AsyncRead`]
74///    and [`AsyncSeek`], for example [`tokio::fs::File`].
75///
76/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
77///    is enabled, implements this interface for [`ObjectStore`].
78///
79/// [`ObjectStore`]: object_store::ObjectStore
80///
81/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
82pub trait AsyncFileReader: Send {
83    /// Retrieve the bytes in `range`
84    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
85
86    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
87    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
88        async move {
89            let mut result = Vec::with_capacity(ranges.len());
90
91            for range in ranges.into_iter() {
92                let data = self.get_bytes(range).await?;
93                result.push(data);
94            }
95
96            Ok(result)
97        }
98        .boxed()
99    }
100
101    /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
102    /// allowing fine-grained control over how metadata is sourced, in particular allowing
103    /// for caching, pre-fetching, catalog metadata, etc...
104    /// ArrowReaderOptions may be provided to supply decryption parameters
105    fn get_metadata<'a>(
106        &'a mut self,
107        options: Option<&'a ArrowReaderOptions>,
108    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
109}
110
111/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
112impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
113    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
114        self.as_mut().get_bytes(range)
115    }
116
117    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
118        self.as_mut().get_byte_ranges(ranges)
119    }
120
121    fn get_metadata<'a>(
122        &'a mut self,
123        options: Option<&'a ArrowReaderOptions>,
124    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
125        self.as_mut().get_metadata(options)
126    }
127}
128
129impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
130    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
131        async move {
132            self.seek(SeekFrom::Start(range.start as u64)).await?;
133
134            let to_read = range.end - range.start;
135            let mut buffer = Vec::with_capacity(to_read);
136            let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
137            if read != to_read {
138                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
139            }
140
141            Ok(buffer.into())
142        }
143        .boxed()
144    }
145
146    fn get_metadata<'a>(
147        &'a mut self,
148        options: Option<&'a ArrowReaderOptions>,
149    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
150        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
151        async move {
152            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
153
154            let mut buf = [0_u8; FOOTER_SIZE];
155            self.read_exact(&mut buf).await?;
156
157            let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
158            let metadata_len = footer.metadata_length();
159
160            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
161                .await?;
162
163            let mut buf = Vec::with_capacity(metadata_len);
164            self.take(metadata_len as _).read_to_end(&mut buf).await?;
165
166            let metadata_reader = ParquetMetaDataReader::new();
167
168            #[cfg(feature = "encryption")]
169            let metadata_reader = metadata_reader.with_decryption_properties(
170                options.and_then(|o| o.file_decryption_properties.as_ref()),
171            );
172
173            let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?;
174
175            Ok(Arc::new(parquet_metadata))
176        }
177        .boxed()
178    }
179}
180
181impl ArrowReaderMetadata {
182    /// Returns a new [`ArrowReaderMetadata`] for this builder
183    ///
184    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
185    ///
186    /// # Notes
187    ///
188    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
189    /// `Self::metadata` is missing the page index, this function will attempt
190    /// to load the page index by making an object store request.
191    pub async fn load_async<T: AsyncFileReader>(
192        input: &mut T,
193        options: ArrowReaderOptions,
194    ) -> Result<Self> {
195        // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
196        // took an argument to fetch the page indexes.
197        let mut metadata = input.get_metadata(Some(&options)).await?;
198
199        if options.page_index
200            && metadata.column_index().is_none()
201            && metadata.offset_index().is_none()
202        {
203            let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
204            let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
205
206            #[cfg(feature = "encryption")]
207            {
208                reader =
209                    reader.with_decryption_properties(options.file_decryption_properties.as_ref());
210            }
211
212            reader.load_page_index(input).await?;
213            metadata = Arc::new(reader.finish()?)
214        }
215        Self::try_new(metadata, options)
216    }
217}
218
219#[doc(hidden)]
220/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
221///
222/// Allows sharing the same builder for both the sync and async versions, whilst also not
223/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
224pub struct AsyncReader<T>(T);
225
226/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
227///
228/// This can be used to decode a Parquet file in streaming fashion (without
229/// downloading the whole file at once) from a remote source, such as an object store.
230///
231/// This builder handles reading the parquet file metadata, allowing consumers
232/// to use this information to select what specific columns, row groups, etc.
233/// they wish to be read by the resulting stream.
234///
235/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
236///
237/// See [`ArrowReaderBuilder`] for additional member functions
238pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
239
240impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
241    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
242    /// specified source.
243    ///
244    /// # Example
245    /// ```
246    /// # #[tokio::main(flavor="current_thread")]
247    /// # async fn main() {
248    /// #
249    /// # use arrow_array::RecordBatch;
250    /// # use arrow::util::pretty::pretty_format_batches;
251    /// # use futures::TryStreamExt;
252    /// #
253    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
254    /// #
255    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
256    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
257    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
258    /// #     assert_eq!(
259    /// #          &actual_lines, expected_lines,
260    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
261    /// #          expected_lines, actual_lines
262    /// #      );
263    /// #  }
264    /// #
265    /// # let testdata = arrow::util::test_util::parquet_test_data();
266    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
267    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
268    /// // another async I/O reader such as a reader from an object store.
269    /// let file = tokio::fs::File::open(path).await.unwrap();
270    ///
271    /// // Configure options for reading from the async souce
272    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
273    ///     .await
274    ///     .unwrap();
275    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
276    /// // a stream that can be used to incrementally read the data in batches
277    /// let stream = builder.build().unwrap();
278    /// // In this example, we collect the stream into a Vec<RecordBatch>
279    /// // but real applications would likely process the batches as they are read
280    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
281    /// // Demonstrate the results are as expected
282    /// assert_batches_eq(
283    ///     &results,
284    ///     &[
285    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
286    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
287    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
288    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
289    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
290    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
291    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
292    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
293    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
294    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
295    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
296    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
297    ///      ],
298    ///  );
299    /// # }
300    /// ```
301    ///
302    /// # Example configuring options and reading metadata
303    ///
304    /// There are many options that control the behavior of the reader, such as
305    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
306    ///
307    /// ```
308    /// # #[tokio::main(flavor="current_thread")]
309    /// # async fn main() {
310    /// #
311    /// # use arrow_array::RecordBatch;
312    /// # use arrow::util::pretty::pretty_format_batches;
313    /// # use futures::TryStreamExt;
314    /// #
315    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
316    /// #
317    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
318    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
319    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
320    /// #     assert_eq!(
321    /// #          &actual_lines, expected_lines,
322    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
323    /// #          expected_lines, actual_lines
324    /// #      );
325    /// #  }
326    /// #
327    /// # let testdata = arrow::util::test_util::parquet_test_data();
328    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
329    /// // As before, use tokio::fs::File to read data using an async I/O.
330    /// let file = tokio::fs::File::open(path).await.unwrap();
331    ///
332    /// // Configure options for reading from the async source, in this case we set the batch size
333    /// // to 3 which produces 3 rows at a time.
334    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
335    ///     .await
336    ///     .unwrap()
337    ///     .with_batch_size(3);
338    ///
339    /// // We can also read the metadata to inspect the schema and other metadata
340    /// // before actually reading the data
341    /// let file_metadata = builder.metadata().file_metadata();
342    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
343    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
344    ///
345    /// let stream = builder.with_projection(mask).build().unwrap();
346    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
347    /// // Print out the results
348    /// assert_batches_eq(
349    ///     &results,
350    ///     &[
351    ///         "+----------+-------------+-----------+",
352    ///         "| bool_col | tinyint_col | float_col |",
353    ///         "+----------+-------------+-----------+",
354    ///         "| true     | 0           | 0.0       |",
355    ///         "| false    | 1           | 1.1       |",
356    ///         "| true     | 0           | 0.0       |",
357    ///         "| false    | 1           | 1.1       |",
358    ///         "| true     | 0           | 0.0       |",
359    ///         "| false    | 1           | 1.1       |",
360    ///         "| true     | 0           | 0.0       |",
361    ///         "| false    | 1           | 1.1       |",
362    ///         "+----------+-------------+-----------+",
363    ///      ],
364    ///  );
365    ///
366    /// // The results has 8 rows, so since we set the batch size to 3, we expect
367    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
368    /// assert_eq!(results.len(), 3);
369    /// # }
370    /// ```
371    pub async fn new(input: T) -> Result<Self> {
372        Self::new_with_options(input, Default::default()).await
373    }
374
375    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
376    /// and [`ArrowReaderOptions`].
377    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
378        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
379        Ok(Self::new_with_metadata(input, metadata))
380    }
381
382    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
383    ///
384    /// This allows loading metadata once and using it to create multiple builders with
385    /// potentially different settings, that can be read in parallel.
386    ///
387    /// # Example of reading from multiple streams in parallel
388    ///
389    /// ```
390    /// # use std::fs::metadata;
391    /// # use std::sync::Arc;
392    /// # use bytes::Bytes;
393    /// # use arrow_array::{Int32Array, RecordBatch};
394    /// # use arrow_schema::{DataType, Field, Schema};
395    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
396    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
397    /// # use tempfile::tempfile;
398    /// # use futures::StreamExt;
399    /// # #[tokio::main(flavor="current_thread")]
400    /// # async fn main() {
401    /// #
402    /// # let mut file = tempfile().unwrap();
403    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
404    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
405    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
406    /// # writer.write(&batch).unwrap();
407    /// # writer.close().unwrap();
408    /// // open file with parquet data
409    /// let mut file = tokio::fs::File::from_std(file);
410    /// // load metadata once
411    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
412    /// // create two readers, a and b, from the same underlying file
413    /// // without reading the metadata again
414    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
415    ///     file.try_clone().await.unwrap(),
416    ///     meta.clone()
417    /// ).build().unwrap();
418    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
419    ///
420    /// // Can read batches from both readers in parallel
421    /// assert_eq!(
422    ///   a.next().await.unwrap().unwrap(),
423    ///   b.next().await.unwrap().unwrap(),
424    /// );
425    /// # }
426    /// ```
427    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
428        Self::new_builder(AsyncReader(input), metadata)
429    }
430
431    /// Read bloom filter for a column in a row group
432    ///
433    /// Returns `None` if the column does not have a bloom filter
434    ///
435    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
436    pub async fn get_row_group_column_bloom_filter(
437        &mut self,
438        row_group_idx: usize,
439        column_idx: usize,
440    ) -> Result<Option<Sbbf>> {
441        let metadata = self.metadata.row_group(row_group_idx);
442        let column_metadata = metadata.column(column_idx);
443
444        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
445            offset
446                .try_into()
447                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
448        } else {
449            return Ok(None);
450        };
451
452        let buffer = match column_metadata.bloom_filter_length() {
453            Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
454            None => self
455                .input
456                .0
457                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
458        }
459        .await?;
460
461        let (header, bitset_offset) =
462            chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
463
464        match header.algorithm {
465            BloomFilterAlgorithm::BLOCK(_) => {
466                // this match exists to future proof the singleton algorithm enum
467            }
468        }
469        match header.compression {
470            BloomFilterCompression::UNCOMPRESSED(_) => {
471                // this match exists to future proof the singleton compression enum
472            }
473        }
474        match header.hash {
475            BloomFilterHash::XXHASH(_) => {
476                // this match exists to future proof the singleton hash enum
477            }
478        }
479
480        let bitset = match column_metadata.bloom_filter_length() {
481            Some(_) => buffer.slice((bitset_offset as usize - offset)..),
482            None => {
483                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
484                    ParquetError::General("Bloom filter length is invalid".to_string())
485                })?;
486                self.input
487                    .0
488                    .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
489                    .await?
490            }
491        };
492        Ok(Some(Sbbf::new(&bitset)))
493    }
494
495    /// Build a new [`ParquetRecordBatchStream`]
496    ///
497    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
498    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
499        let num_row_groups = self.metadata.row_groups().len();
500
501        let row_groups = match self.row_groups {
502            Some(row_groups) => {
503                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
504                    return Err(general_err!(
505                        "row group {} out of bounds 0..{}",
506                        col,
507                        num_row_groups
508                    ));
509                }
510                row_groups.into()
511            }
512            None => (0..self.metadata.row_groups().len()).collect(),
513        };
514
515        // Try to avoid allocate large buffer
516        let batch_size = self
517            .batch_size
518            .min(self.metadata.file_metadata().num_rows() as usize);
519        let reader_factory = ReaderFactory {
520            input: self.input.0,
521            filter: self.filter,
522            metadata: self.metadata.clone(),
523            fields: self.fields,
524            limit: self.limit,
525            offset: self.offset,
526        };
527
528        // Ensure schema of ParquetRecordBatchStream respects projection, and does
529        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
530        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
531            Some(DataType::Struct(fields)) => {
532                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
533            }
534            None => Fields::empty(),
535            _ => unreachable!("Must be Struct for root type"),
536        };
537        let schema = Arc::new(Schema::new(projected_fields));
538
539        Ok(ParquetRecordBatchStream {
540            metadata: self.metadata,
541            batch_size,
542            row_groups,
543            projection: self.projection,
544            selection: self.selection,
545            schema,
546            reader_factory: Some(reader_factory),
547            state: StreamState::Init,
548        })
549    }
550}
551
552type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
553
554/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
555/// [`ParquetRecordBatchReader`]
556struct ReaderFactory<T> {
557    metadata: Arc<ParquetMetaData>,
558
559    fields: Option<Arc<ParquetField>>,
560
561    input: T,
562
563    filter: Option<RowFilter>,
564
565    limit: Option<usize>,
566
567    offset: Option<usize>,
568}
569
570impl<T> ReaderFactory<T>
571where
572    T: AsyncFileReader + Send,
573{
574    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
575    ///
576    /// Note: this captures self so that the resulting future has a static lifetime
577    async fn read_row_group(
578        mut self,
579        row_group_idx: usize,
580        mut selection: Option<RowSelection>,
581        projection: ProjectionMask,
582        batch_size: usize,
583    ) -> ReadResult<T> {
584        // TODO: calling build_array multiple times is wasteful
585
586        let meta = self.metadata.row_group(row_group_idx);
587        let offset_index = self
588            .metadata
589            .offset_index()
590            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
591            .filter(|index| !index.is_empty())
592            .map(|x| x[row_group_idx].as_slice());
593
594        let mut row_group = InMemoryRowGroup {
595            // schema: meta.schema_descr_ptr(),
596            row_count: meta.num_rows() as usize,
597            column_chunks: vec![None; meta.columns().len()],
598            offset_index,
599            row_group_idx,
600            metadata: self.metadata.as_ref(),
601        };
602
603        if let Some(filter) = self.filter.as_mut() {
604            for predicate in filter.predicates.iter_mut() {
605                if !selects_any(selection.as_ref()) {
606                    return Ok((self, None));
607                }
608
609                let predicate_projection = predicate.projection();
610                row_group
611                    .fetch(&mut self.input, predicate_projection, selection.as_ref())
612                    .await?;
613
614                let array_reader =
615                    build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
616
617                selection = Some(evaluate_predicate(
618                    batch_size,
619                    array_reader,
620                    selection,
621                    predicate.as_mut(),
622                )?);
623            }
624        }
625
626        // Compute the number of rows in the selection before applying limit and offset
627        let rows_before = selection
628            .as_ref()
629            .map(|s| s.row_count())
630            .unwrap_or(row_group.row_count);
631
632        if rows_before == 0 {
633            return Ok((self, None));
634        }
635
636        selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
637
638        // Compute the number of rows in the selection after applying limit and offset
639        let rows_after = selection
640            .as_ref()
641            .map(|s| s.row_count())
642            .unwrap_or(row_group.row_count);
643
644        // Update offset if necessary
645        if let Some(offset) = &mut self.offset {
646            // Reduction is either because of offset or limit, as limit is applied
647            // after offset has been "exhausted" can just use saturating sub here
648            *offset = offset.saturating_sub(rows_before - rows_after)
649        }
650
651        if rows_after == 0 {
652            return Ok((self, None));
653        }
654
655        if let Some(limit) = &mut self.limit {
656            *limit -= rows_after;
657        }
658
659        row_group
660            .fetch(&mut self.input, &projection, selection.as_ref())
661            .await?;
662
663        let reader = ParquetRecordBatchReader::new(
664            batch_size,
665            build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
666            selection,
667        );
668
669        Ok((self, Some(reader)))
670    }
671}
672
673enum StreamState<T> {
674    /// At the start of a new row group, or the end of the parquet stream
675    Init,
676    /// Decoding a batch
677    Decoding(ParquetRecordBatchReader),
678    /// Reading data from input
679    Reading(BoxFuture<'static, ReadResult<T>>),
680    /// Error
681    Error,
682}
683
684impl<T> std::fmt::Debug for StreamState<T> {
685    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
686        match self {
687            StreamState::Init => write!(f, "StreamState::Init"),
688            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
689            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
690            StreamState::Error => write!(f, "StreamState::Error"),
691        }
692    }
693}
694
695/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
696///
697/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
698/// allowing users to decode record batches separately from I/O.
699///
700/// # I/O Buffering
701///
702/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
703/// (projection + filtering, etc) and decodes the rows from those buffered pages.
704///
705/// For example, if all rows and columns are selected, the entire row group is
706/// buffered in memory during decode. This minimizes the number of IO operations
707/// required, which is especially important for object stores, where IO operations
708/// have latencies in the hundreds of milliseconds
709///
710///
711/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
712pub struct ParquetRecordBatchStream<T> {
713    metadata: Arc<ParquetMetaData>,
714
715    schema: SchemaRef,
716
717    row_groups: VecDeque<usize>,
718
719    projection: ProjectionMask,
720
721    batch_size: usize,
722
723    selection: Option<RowSelection>,
724
725    /// This is an option so it can be moved into a future
726    reader_factory: Option<ReaderFactory<T>>,
727
728    state: StreamState<T>,
729}
730
731impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
732    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
733        f.debug_struct("ParquetRecordBatchStream")
734            .field("metadata", &self.metadata)
735            .field("schema", &self.schema)
736            .field("batch_size", &self.batch_size)
737            .field("projection", &self.projection)
738            .field("state", &self.state)
739            .finish()
740    }
741}
742
743impl<T> ParquetRecordBatchStream<T> {
744    /// Returns the projected [`SchemaRef`] for reading the parquet file.
745    ///
746    /// Note that the schema metadata will be stripped here. See
747    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
748    pub fn schema(&self) -> &SchemaRef {
749        &self.schema
750    }
751}
752
753impl<T> ParquetRecordBatchStream<T>
754where
755    T: AsyncFileReader + Unpin + Send + 'static,
756{
757    /// Fetches the next row group from the stream.
758    ///
759    /// Users can continue to call this function to get row groups and decode them concurrently.
760    ///
761    /// ## Notes
762    ///
763    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
764    ///
765    /// ## Returns
766    ///
767    /// - `Ok(None)` if the stream has ended.
768    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
769    /// - `Ok(Some(reader))` which holds all the data for the row group.
770    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
771        loop {
772            match &mut self.state {
773                StreamState::Decoding(_) | StreamState::Reading(_) => {
774                    return Err(ParquetError::General(
775                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
776                    ))
777                }
778                StreamState::Init => {
779                    let row_group_idx = match self.row_groups.pop_front() {
780                        Some(idx) => idx,
781                        None => return Ok(None),
782                    };
783
784                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
785
786                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
787
788                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
789
790                    let (reader_factory, maybe_reader) = reader_factory
791                        .read_row_group(
792                            row_group_idx,
793                            selection,
794                            self.projection.clone(),
795                            self.batch_size,
796                        )
797                        .await
798                        .inspect_err(|_| {
799                            self.state = StreamState::Error;
800                        })?;
801                    self.reader_factory = Some(reader_factory);
802
803                    if let Some(reader) = maybe_reader {
804                        return Ok(Some(reader));
805                    } else {
806                        // All rows skipped, read next row group
807                        continue;
808                    }
809                }
810                StreamState::Error => return Ok(None), // Ends the stream as error happens.
811            }
812        }
813    }
814}
815
816impl<T> Stream for ParquetRecordBatchStream<T>
817where
818    T: AsyncFileReader + Unpin + Send + 'static,
819{
820    type Item = Result<RecordBatch>;
821
822    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
823        loop {
824            match &mut self.state {
825                StreamState::Decoding(batch_reader) => match batch_reader.next() {
826                    Some(Ok(batch)) => {
827                        return Poll::Ready(Some(Ok(batch)));
828                    }
829                    Some(Err(e)) => {
830                        self.state = StreamState::Error;
831                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
832                    }
833                    None => self.state = StreamState::Init,
834                },
835                StreamState::Init => {
836                    let row_group_idx = match self.row_groups.pop_front() {
837                        Some(idx) => idx,
838                        None => return Poll::Ready(None),
839                    };
840
841                    let reader = self.reader_factory.take().expect("lost reader factory");
842
843                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
844
845                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
846
847                    let fut = reader
848                        .read_row_group(
849                            row_group_idx,
850                            selection,
851                            self.projection.clone(),
852                            self.batch_size,
853                        )
854                        .boxed();
855
856                    self.state = StreamState::Reading(fut)
857                }
858                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
859                    Ok((reader_factory, maybe_reader)) => {
860                        self.reader_factory = Some(reader_factory);
861                        match maybe_reader {
862                            // Read records from [`ParquetRecordBatchReader`]
863                            Some(reader) => self.state = StreamState::Decoding(reader),
864                            // All rows skipped, read next row group
865                            None => self.state = StreamState::Init,
866                        }
867                    }
868                    Err(e) => {
869                        self.state = StreamState::Error;
870                        return Poll::Ready(Some(Err(e)));
871                    }
872                },
873                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
874            }
875        }
876    }
877}
878
879/// An in-memory collection of column chunks
880struct InMemoryRowGroup<'a> {
881    offset_index: Option<&'a [OffsetIndexMetaData]>,
882    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
883    row_count: usize,
884    row_group_idx: usize,
885    metadata: &'a ParquetMetaData,
886}
887
888impl InMemoryRowGroup<'_> {
889    /// Fetches the necessary column data into memory
890    async fn fetch<T: AsyncFileReader + Send>(
891        &mut self,
892        input: &mut T,
893        projection: &ProjectionMask,
894        selection: Option<&RowSelection>,
895    ) -> Result<()> {
896        let metadata = self.metadata.row_group(self.row_group_idx);
897        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
898            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
899            // `RowSelection`
900            let mut page_start_offsets: Vec<Vec<usize>> = vec![];
901
902            let fetch_ranges = self
903                .column_chunks
904                .iter()
905                .zip(metadata.columns())
906                .enumerate()
907                .filter(|&(idx, (chunk, _chunk_meta))| {
908                    chunk.is_none() && projection.leaf_included(idx)
909                })
910                .flat_map(|(idx, (_chunk, chunk_meta))| {
911                    // If the first page does not start at the beginning of the column,
912                    // then we need to also fetch a dictionary page.
913                    let mut ranges = vec![];
914                    let (start, _len) = chunk_meta.byte_range();
915                    match offset_index[idx].page_locations.first() {
916                        Some(first) if first.offset as u64 != start => {
917                            ranges.push(start as usize..first.offset as usize);
918                        }
919                        _ => (),
920                    }
921
922                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
923                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
924
925                    ranges
926                })
927                .collect();
928
929            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
930            let mut page_start_offsets = page_start_offsets.into_iter();
931
932            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
933                if chunk.is_some() || !projection.leaf_included(idx) {
934                    continue;
935                }
936
937                if let Some(offsets) = page_start_offsets.next() {
938                    let mut chunks = Vec::with_capacity(offsets.len());
939                    for _ in 0..offsets.len() {
940                        chunks.push(chunk_data.next().unwrap());
941                    }
942
943                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
944                        length: metadata.column(idx).byte_range().1 as usize,
945                        data: offsets.into_iter().zip(chunks.into_iter()).collect(),
946                    }))
947                }
948            }
949        } else {
950            let fetch_ranges = self
951                .column_chunks
952                .iter()
953                .enumerate()
954                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
955                .map(|(idx, _chunk)| {
956                    let column = metadata.column(idx);
957                    let (start, length) = column.byte_range();
958                    start as usize..(start + length) as usize
959                })
960                .collect();
961
962            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
963
964            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
965                if chunk.is_some() || !projection.leaf_included(idx) {
966                    continue;
967                }
968
969                if let Some(data) = chunk_data.next() {
970                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
971                        offset: metadata.column(idx).byte_range().0 as usize,
972                        data,
973                    }));
974                }
975            }
976        }
977
978        Ok(())
979    }
980}
981
982impl RowGroups for InMemoryRowGroup<'_> {
983    fn num_rows(&self) -> usize {
984        self.row_count
985    }
986
987    /// Return chunks for column i
988    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
989        match &self.column_chunks[i] {
990            None => Err(ParquetError::General(format!(
991                "Invalid column index {i}, column was not fetched"
992            ))),
993            Some(data) => {
994                let page_locations = self
995                    .offset_index
996                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
997                    .filter(|index| !index.is_empty())
998                    .map(|index| index[i].page_locations.clone());
999                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1000                let page_reader = SerializedPageReader::new(
1001                    data.clone(),
1002                    column_chunk_metadata,
1003                    self.row_count,
1004                    page_locations,
1005                )?;
1006                let page_reader = page_reader.add_crypto_context(
1007                    self.row_group_idx,
1008                    i,
1009                    self.metadata,
1010                    column_chunk_metadata,
1011                )?;
1012
1013                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1014
1015                Ok(Box::new(ColumnChunkIterator {
1016                    reader: Some(Ok(page_reader)),
1017                }))
1018            }
1019        }
1020    }
1021}
1022
1023/// An in-memory column chunk
1024#[derive(Clone)]
1025enum ColumnChunkData {
1026    /// Column chunk data representing only a subset of data pages
1027    Sparse {
1028        /// Length of the full column chunk
1029        length: usize,
1030        /// Set of data pages included in this sparse chunk. Each element is a tuple
1031        /// of (page offset, page data)
1032        data: Vec<(usize, Bytes)>,
1033    },
1034    /// Full column chunk and its offset
1035    Dense { offset: usize, data: Bytes },
1036}
1037
1038impl ColumnChunkData {
1039    fn get(&self, start: u64) -> Result<Bytes> {
1040        match &self {
1041            ColumnChunkData::Sparse { data, .. } => data
1042                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1043                .map(|idx| data[idx].1.clone())
1044                .map_err(|_| {
1045                    ParquetError::General(format!(
1046                        "Invalid offset in sparse column chunk data: {start}"
1047                    ))
1048                }),
1049            ColumnChunkData::Dense { offset, data } => {
1050                let start = start as usize - *offset;
1051                Ok(data.slice(start..))
1052            }
1053        }
1054    }
1055}
1056
1057impl Length for ColumnChunkData {
1058    fn len(&self) -> u64 {
1059        match &self {
1060            ColumnChunkData::Sparse { length, .. } => *length as u64,
1061            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1062        }
1063    }
1064}
1065
1066impl ChunkReader for ColumnChunkData {
1067    type T = bytes::buf::Reader<Bytes>;
1068
1069    fn get_read(&self, start: u64) -> Result<Self::T> {
1070        Ok(self.get(start)?.reader())
1071    }
1072
1073    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1074        Ok(self.get(start)?.slice(..length))
1075    }
1076}
1077
1078/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1079struct ColumnChunkIterator {
1080    reader: Option<Result<Box<dyn PageReader>>>,
1081}
1082
1083impl Iterator for ColumnChunkIterator {
1084    type Item = Result<Box<dyn PageReader>>;
1085
1086    fn next(&mut self) -> Option<Self::Item> {
1087        self.reader.take()
1088    }
1089}
1090
1091impl PageIterator for ColumnChunkIterator {}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use crate::arrow::arrow_reader::{
1097        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1098    };
1099    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1100    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1101    use crate::arrow::ArrowWriter;
1102    use crate::file::metadata::ParquetMetaDataReader;
1103    use crate::file::properties::WriterProperties;
1104    use arrow::compute::kernels::cmp::eq;
1105    use arrow::error::Result as ArrowResult;
1106    use arrow_array::builder::{ListBuilder, StringBuilder};
1107    use arrow_array::cast::AsArray;
1108    use arrow_array::types::Int32Type;
1109    use arrow_array::{
1110        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1111        StructArray, UInt64Array,
1112    };
1113    use arrow_schema::{DataType, Field, Schema};
1114    use futures::{StreamExt, TryStreamExt};
1115    use rand::{rng, Rng};
1116    use std::collections::HashMap;
1117    use std::sync::{Arc, Mutex};
1118    use tempfile::tempfile;
1119
1120    #[derive(Clone)]
1121    struct TestReader {
1122        data: Bytes,
1123        metadata: Arc<ParquetMetaData>,
1124        requests: Arc<Mutex<Vec<Range<usize>>>>,
1125    }
1126
1127    impl AsyncFileReader for TestReader {
1128        fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1129            self.requests.lock().unwrap().push(range.clone());
1130            futures::future::ready(Ok(self.data.slice(range))).boxed()
1131        }
1132
1133        fn get_metadata<'a>(
1134            &'a mut self,
1135            _options: Option<&'a ArrowReaderOptions>,
1136        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1137            futures::future::ready(Ok(self.metadata.clone())).boxed()
1138        }
1139    }
1140
1141    #[tokio::test]
1142    async fn test_async_reader() {
1143        let testdata = arrow::util::test_util::parquet_test_data();
1144        let path = format!("{testdata}/alltypes_plain.parquet");
1145        let data = Bytes::from(std::fs::read(path).unwrap());
1146
1147        let metadata = ParquetMetaDataReader::new()
1148            .parse_and_finish(&data)
1149            .unwrap();
1150        let metadata = Arc::new(metadata);
1151
1152        assert_eq!(metadata.num_row_groups(), 1);
1153
1154        let async_reader = TestReader {
1155            data: data.clone(),
1156            metadata: metadata.clone(),
1157            requests: Default::default(),
1158        };
1159
1160        let requests = async_reader.requests.clone();
1161        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1162            .await
1163            .unwrap();
1164
1165        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1166        let stream = builder
1167            .with_projection(mask.clone())
1168            .with_batch_size(1024)
1169            .build()
1170            .unwrap();
1171
1172        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1173
1174        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1175            .unwrap()
1176            .with_projection(mask)
1177            .with_batch_size(104)
1178            .build()
1179            .unwrap()
1180            .collect::<ArrowResult<Vec<_>>>()
1181            .unwrap();
1182
1183        assert_eq!(async_batches, sync_batches);
1184
1185        let requests = requests.lock().unwrap();
1186        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1187        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1188
1189        assert_eq!(
1190            &requests[..],
1191            &[
1192                offset_1 as usize..(offset_1 + length_1) as usize,
1193                offset_2 as usize..(offset_2 + length_2) as usize
1194            ]
1195        );
1196    }
1197
1198    #[tokio::test]
1199    async fn test_async_reader_with_next_row_group() {
1200        let testdata = arrow::util::test_util::parquet_test_data();
1201        let path = format!("{testdata}/alltypes_plain.parquet");
1202        let data = Bytes::from(std::fs::read(path).unwrap());
1203
1204        let metadata = ParquetMetaDataReader::new()
1205            .parse_and_finish(&data)
1206            .unwrap();
1207        let metadata = Arc::new(metadata);
1208
1209        assert_eq!(metadata.num_row_groups(), 1);
1210
1211        let async_reader = TestReader {
1212            data: data.clone(),
1213            metadata: metadata.clone(),
1214            requests: Default::default(),
1215        };
1216
1217        let requests = async_reader.requests.clone();
1218        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1219            .await
1220            .unwrap();
1221
1222        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1223        let mut stream = builder
1224            .with_projection(mask.clone())
1225            .with_batch_size(1024)
1226            .build()
1227            .unwrap();
1228
1229        let mut readers = vec![];
1230        while let Some(reader) = stream.next_row_group().await.unwrap() {
1231            readers.push(reader);
1232        }
1233
1234        let async_batches: Vec<_> = readers
1235            .into_iter()
1236            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1237            .collect();
1238
1239        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1240            .unwrap()
1241            .with_projection(mask)
1242            .with_batch_size(104)
1243            .build()
1244            .unwrap()
1245            .collect::<ArrowResult<Vec<_>>>()
1246            .unwrap();
1247
1248        assert_eq!(async_batches, sync_batches);
1249
1250        let requests = requests.lock().unwrap();
1251        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1252        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1253
1254        assert_eq!(
1255            &requests[..],
1256            &[
1257                offset_1 as usize..(offset_1 + length_1) as usize,
1258                offset_2 as usize..(offset_2 + length_2) as usize
1259            ]
1260        );
1261    }
1262
1263    #[tokio::test]
1264    async fn test_async_reader_with_index() {
1265        let testdata = arrow::util::test_util::parquet_test_data();
1266        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1267        let data = Bytes::from(std::fs::read(path).unwrap());
1268
1269        let metadata = ParquetMetaDataReader::new()
1270            .parse_and_finish(&data)
1271            .unwrap();
1272        let metadata = Arc::new(metadata);
1273
1274        assert_eq!(metadata.num_row_groups(), 1);
1275
1276        let async_reader = TestReader {
1277            data: data.clone(),
1278            metadata: metadata.clone(),
1279            requests: Default::default(),
1280        };
1281
1282        let options = ArrowReaderOptions::new().with_page_index(true);
1283        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1284            .await
1285            .unwrap();
1286
1287        // The builder should have page and offset indexes loaded now
1288        let metadata_with_index = builder.metadata();
1289
1290        // Check offset indexes are present for all columns
1291        let offset_index = metadata_with_index.offset_index().unwrap();
1292        let column_index = metadata_with_index.column_index().unwrap();
1293
1294        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1295        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1296
1297        let num_columns = metadata_with_index
1298            .file_metadata()
1299            .schema_descr()
1300            .num_columns();
1301
1302        // Check page indexes are present for all columns
1303        offset_index
1304            .iter()
1305            .for_each(|x| assert_eq!(x.len(), num_columns));
1306        column_index
1307            .iter()
1308            .for_each(|x| assert_eq!(x.len(), num_columns));
1309
1310        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1311        let stream = builder
1312            .with_projection(mask.clone())
1313            .with_batch_size(1024)
1314            .build()
1315            .unwrap();
1316
1317        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1318
1319        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1320            .unwrap()
1321            .with_projection(mask)
1322            .with_batch_size(1024)
1323            .build()
1324            .unwrap()
1325            .collect::<ArrowResult<Vec<_>>>()
1326            .unwrap();
1327
1328        assert_eq!(async_batches, sync_batches);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_async_reader_with_limit() {
1333        let testdata = arrow::util::test_util::parquet_test_data();
1334        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1335        let data = Bytes::from(std::fs::read(path).unwrap());
1336
1337        let metadata = ParquetMetaDataReader::new()
1338            .parse_and_finish(&data)
1339            .unwrap();
1340        let metadata = Arc::new(metadata);
1341
1342        assert_eq!(metadata.num_row_groups(), 1);
1343
1344        let async_reader = TestReader {
1345            data: data.clone(),
1346            metadata: metadata.clone(),
1347            requests: Default::default(),
1348        };
1349
1350        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1351            .await
1352            .unwrap();
1353
1354        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1355        let stream = builder
1356            .with_projection(mask.clone())
1357            .with_batch_size(1024)
1358            .with_limit(1)
1359            .build()
1360            .unwrap();
1361
1362        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1363
1364        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1365            .unwrap()
1366            .with_projection(mask)
1367            .with_batch_size(1024)
1368            .with_limit(1)
1369            .build()
1370            .unwrap()
1371            .collect::<ArrowResult<Vec<_>>>()
1372            .unwrap();
1373
1374        assert_eq!(async_batches, sync_batches);
1375    }
1376
1377    #[tokio::test]
1378    async fn test_async_reader_skip_pages() {
1379        let testdata = arrow::util::test_util::parquet_test_data();
1380        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1381        let data = Bytes::from(std::fs::read(path).unwrap());
1382
1383        let metadata = ParquetMetaDataReader::new()
1384            .parse_and_finish(&data)
1385            .unwrap();
1386        let metadata = Arc::new(metadata);
1387
1388        assert_eq!(metadata.num_row_groups(), 1);
1389
1390        let async_reader = TestReader {
1391            data: data.clone(),
1392            metadata: metadata.clone(),
1393            requests: Default::default(),
1394        };
1395
1396        let options = ArrowReaderOptions::new().with_page_index(true);
1397        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1398            .await
1399            .unwrap();
1400
1401        let selection = RowSelection::from(vec![
1402            RowSelector::skip(21),   // Skip first page
1403            RowSelector::select(21), // Select page to boundary
1404            RowSelector::skip(41),   // Skip multiple pages
1405            RowSelector::select(41), // Select multiple pages
1406            RowSelector::skip(25),   // Skip page across boundary
1407            RowSelector::select(25), // Select across page boundary
1408            RowSelector::skip(7116), // Skip to final page boundary
1409            RowSelector::select(10), // Select final page
1410        ]);
1411
1412        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1413
1414        let stream = builder
1415            .with_projection(mask.clone())
1416            .with_row_selection(selection.clone())
1417            .build()
1418            .expect("building stream");
1419
1420        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1421
1422        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1423            .unwrap()
1424            .with_projection(mask)
1425            .with_batch_size(1024)
1426            .with_row_selection(selection)
1427            .build()
1428            .unwrap()
1429            .collect::<ArrowResult<Vec<_>>>()
1430            .unwrap();
1431
1432        assert_eq!(async_batches, sync_batches);
1433    }
1434
1435    #[tokio::test]
1436    async fn test_fuzz_async_reader_selection() {
1437        let testdata = arrow::util::test_util::parquet_test_data();
1438        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1439        let data = Bytes::from(std::fs::read(path).unwrap());
1440
1441        let metadata = ParquetMetaDataReader::new()
1442            .parse_and_finish(&data)
1443            .unwrap();
1444        let metadata = Arc::new(metadata);
1445
1446        assert_eq!(metadata.num_row_groups(), 1);
1447
1448        let mut rand = rng();
1449
1450        for _ in 0..100 {
1451            let mut expected_rows = 0;
1452            let mut total_rows = 0;
1453            let mut skip = false;
1454            let mut selectors = vec![];
1455
1456            while total_rows < 7300 {
1457                let row_count: usize = rand.random_range(1..100);
1458
1459                let row_count = row_count.min(7300 - total_rows);
1460
1461                selectors.push(RowSelector { row_count, skip });
1462
1463                total_rows += row_count;
1464                if !skip {
1465                    expected_rows += row_count;
1466                }
1467
1468                skip = !skip;
1469            }
1470
1471            let selection = RowSelection::from(selectors);
1472
1473            let async_reader = TestReader {
1474                data: data.clone(),
1475                metadata: metadata.clone(),
1476                requests: Default::default(),
1477            };
1478
1479            let options = ArrowReaderOptions::new().with_page_index(true);
1480            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1481                .await
1482                .unwrap();
1483
1484            let col_idx: usize = rand.random_range(0..13);
1485            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1486
1487            let stream = builder
1488                .with_projection(mask.clone())
1489                .with_row_selection(selection.clone())
1490                .build()
1491                .expect("building stream");
1492
1493            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1494
1495            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1496
1497            assert_eq!(actual_rows, expected_rows);
1498        }
1499    }
1500
1501    #[tokio::test]
1502    async fn test_async_reader_zero_row_selector() {
1503        //See https://github.com/apache/arrow-rs/issues/2669
1504        let testdata = arrow::util::test_util::parquet_test_data();
1505        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1506        let data = Bytes::from(std::fs::read(path).unwrap());
1507
1508        let metadata = ParquetMetaDataReader::new()
1509            .parse_and_finish(&data)
1510            .unwrap();
1511        let metadata = Arc::new(metadata);
1512
1513        assert_eq!(metadata.num_row_groups(), 1);
1514
1515        let mut rand = rng();
1516
1517        let mut expected_rows = 0;
1518        let mut total_rows = 0;
1519        let mut skip = false;
1520        let mut selectors = vec![];
1521
1522        selectors.push(RowSelector {
1523            row_count: 0,
1524            skip: false,
1525        });
1526
1527        while total_rows < 7300 {
1528            let row_count: usize = rand.random_range(1..100);
1529
1530            let row_count = row_count.min(7300 - total_rows);
1531
1532            selectors.push(RowSelector { row_count, skip });
1533
1534            total_rows += row_count;
1535            if !skip {
1536                expected_rows += row_count;
1537            }
1538
1539            skip = !skip;
1540        }
1541
1542        let selection = RowSelection::from(selectors);
1543
1544        let async_reader = TestReader {
1545            data: data.clone(),
1546            metadata: metadata.clone(),
1547            requests: Default::default(),
1548        };
1549
1550        let options = ArrowReaderOptions::new().with_page_index(true);
1551        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1552            .await
1553            .unwrap();
1554
1555        let col_idx: usize = rand.random_range(0..13);
1556        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1557
1558        let stream = builder
1559            .with_projection(mask.clone())
1560            .with_row_selection(selection.clone())
1561            .build()
1562            .expect("building stream");
1563
1564        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1565
1566        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1567
1568        assert_eq!(actual_rows, expected_rows);
1569    }
1570
1571    #[tokio::test]
1572    async fn test_row_filter() {
1573        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1574        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1575        let c = Int32Array::from_iter(0..6);
1576        let data = RecordBatch::try_from_iter([
1577            ("a", Arc::new(a) as ArrayRef),
1578            ("b", Arc::new(b) as ArrayRef),
1579            ("c", Arc::new(c) as ArrayRef),
1580        ])
1581        .unwrap();
1582
1583        let mut buf = Vec::with_capacity(1024);
1584        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1585        writer.write(&data).unwrap();
1586        writer.close().unwrap();
1587
1588        let data: Bytes = buf.into();
1589        let metadata = ParquetMetaDataReader::new()
1590            .parse_and_finish(&data)
1591            .unwrap();
1592        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1593
1594        let test = TestReader {
1595            data,
1596            metadata: Arc::new(metadata),
1597            requests: Default::default(),
1598        };
1599        let requests = test.requests.clone();
1600
1601        let a_scalar = StringArray::from_iter_values(["b"]);
1602        let a_filter = ArrowPredicateFn::new(
1603            ProjectionMask::leaves(&parquet_schema, vec![0]),
1604            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1605        );
1606
1607        let b_scalar = StringArray::from_iter_values(["4"]);
1608        let b_filter = ArrowPredicateFn::new(
1609            ProjectionMask::leaves(&parquet_schema, vec![1]),
1610            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1611        );
1612
1613        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1614
1615        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1616        let stream = ParquetRecordBatchStreamBuilder::new(test)
1617            .await
1618            .unwrap()
1619            .with_projection(mask.clone())
1620            .with_batch_size(1024)
1621            .with_row_filter(filter)
1622            .build()
1623            .unwrap();
1624
1625        let batches: Vec<_> = stream.try_collect().await.unwrap();
1626        assert_eq!(batches.len(), 1);
1627
1628        let batch = &batches[0];
1629        assert_eq!(batch.num_rows(), 1);
1630        assert_eq!(batch.num_columns(), 2);
1631
1632        let col = batch.column(0);
1633        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1634        assert_eq!(val, "b");
1635
1636        let col = batch.column(1);
1637        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1638        assert_eq!(val, 3);
1639
1640        // Should only have made 3 requests
1641        assert_eq!(requests.lock().unwrap().len(), 3);
1642    }
1643
1644    #[tokio::test]
1645    async fn test_limit_multiple_row_groups() {
1646        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1647        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1648        let c = Int32Array::from_iter(0..6);
1649        let data = RecordBatch::try_from_iter([
1650            ("a", Arc::new(a) as ArrayRef),
1651            ("b", Arc::new(b) as ArrayRef),
1652            ("c", Arc::new(c) as ArrayRef),
1653        ])
1654        .unwrap();
1655
1656        let mut buf = Vec::with_capacity(1024);
1657        let props = WriterProperties::builder()
1658            .set_max_row_group_size(3)
1659            .build();
1660        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1661        writer.write(&data).unwrap();
1662        writer.close().unwrap();
1663
1664        let data: Bytes = buf.into();
1665        let metadata = ParquetMetaDataReader::new()
1666            .parse_and_finish(&data)
1667            .unwrap();
1668
1669        assert_eq!(metadata.num_row_groups(), 2);
1670
1671        let test = TestReader {
1672            data,
1673            metadata: Arc::new(metadata),
1674            requests: Default::default(),
1675        };
1676
1677        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1678            .await
1679            .unwrap()
1680            .with_batch_size(1024)
1681            .with_limit(4)
1682            .build()
1683            .unwrap();
1684
1685        let batches: Vec<_> = stream.try_collect().await.unwrap();
1686        // Expect one batch for each row group
1687        assert_eq!(batches.len(), 2);
1688
1689        let batch = &batches[0];
1690        // First batch should contain all rows
1691        assert_eq!(batch.num_rows(), 3);
1692        assert_eq!(batch.num_columns(), 3);
1693        let col2 = batch.column(2).as_primitive::<Int32Type>();
1694        assert_eq!(col2.values(), &[0, 1, 2]);
1695
1696        let batch = &batches[1];
1697        // Second batch should trigger the limit and only have one row
1698        assert_eq!(batch.num_rows(), 1);
1699        assert_eq!(batch.num_columns(), 3);
1700        let col2 = batch.column(2).as_primitive::<Int32Type>();
1701        assert_eq!(col2.values(), &[3]);
1702
1703        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1704            .await
1705            .unwrap()
1706            .with_offset(2)
1707            .with_limit(3)
1708            .build()
1709            .unwrap();
1710
1711        let batches: Vec<_> = stream.try_collect().await.unwrap();
1712        // Expect one batch for each row group
1713        assert_eq!(batches.len(), 2);
1714
1715        let batch = &batches[0];
1716        // First batch should contain one row
1717        assert_eq!(batch.num_rows(), 1);
1718        assert_eq!(batch.num_columns(), 3);
1719        let col2 = batch.column(2).as_primitive::<Int32Type>();
1720        assert_eq!(col2.values(), &[2]);
1721
1722        let batch = &batches[1];
1723        // Second batch should contain two rows
1724        assert_eq!(batch.num_rows(), 2);
1725        assert_eq!(batch.num_columns(), 3);
1726        let col2 = batch.column(2).as_primitive::<Int32Type>();
1727        assert_eq!(col2.values(), &[3, 4]);
1728
1729        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1730            .await
1731            .unwrap()
1732            .with_offset(4)
1733            .with_limit(20)
1734            .build()
1735            .unwrap();
1736
1737        let batches: Vec<_> = stream.try_collect().await.unwrap();
1738        // Should skip first row group
1739        assert_eq!(batches.len(), 1);
1740
1741        let batch = &batches[0];
1742        // First batch should contain two rows
1743        assert_eq!(batch.num_rows(), 2);
1744        assert_eq!(batch.num_columns(), 3);
1745        let col2 = batch.column(2).as_primitive::<Int32Type>();
1746        assert_eq!(col2.values(), &[4, 5]);
1747    }
1748
1749    #[tokio::test]
1750    async fn test_row_filter_with_index() {
1751        let testdata = arrow::util::test_util::parquet_test_data();
1752        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1753        let data = Bytes::from(std::fs::read(path).unwrap());
1754
1755        let metadata = ParquetMetaDataReader::new()
1756            .parse_and_finish(&data)
1757            .unwrap();
1758        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1759        let metadata = Arc::new(metadata);
1760
1761        assert_eq!(metadata.num_row_groups(), 1);
1762
1763        let async_reader = TestReader {
1764            data: data.clone(),
1765            metadata: metadata.clone(),
1766            requests: Default::default(),
1767        };
1768
1769        let a_filter =
1770            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1771                Ok(batch.column(0).as_boolean().clone())
1772            });
1773
1774        let b_scalar = Int8Array::from(vec![2]);
1775        let b_filter = ArrowPredicateFn::new(
1776            ProjectionMask::leaves(&parquet_schema, vec![2]),
1777            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1778        );
1779
1780        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1781
1782        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1783
1784        let options = ArrowReaderOptions::new().with_page_index(true);
1785        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1786            .await
1787            .unwrap()
1788            .with_projection(mask.clone())
1789            .with_batch_size(1024)
1790            .with_row_filter(filter)
1791            .build()
1792            .unwrap();
1793
1794        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1795
1796        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1797
1798        assert_eq!(total_rows, 730);
1799    }
1800
1801    #[tokio::test]
1802    async fn test_in_memory_row_group_sparse() {
1803        let testdata = arrow::util::test_util::parquet_test_data();
1804        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1805        let data = Bytes::from(std::fs::read(path).unwrap());
1806
1807        let metadata = ParquetMetaDataReader::new()
1808            .with_page_indexes(true)
1809            .parse_and_finish(&data)
1810            .unwrap();
1811
1812        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1813
1814        let mut metadata_builder = metadata.into_builder();
1815        let mut row_groups = metadata_builder.take_row_groups();
1816        row_groups.truncate(1);
1817        let row_group_meta = row_groups.pop().unwrap();
1818
1819        let metadata = metadata_builder
1820            .add_row_group(row_group_meta)
1821            .set_column_index(None)
1822            .set_offset_index(Some(vec![offset_index.clone()]))
1823            .build();
1824
1825        let metadata = Arc::new(metadata);
1826
1827        let num_rows = metadata.row_group(0).num_rows();
1828
1829        assert_eq!(metadata.num_row_groups(), 1);
1830
1831        let async_reader = TestReader {
1832            data: data.clone(),
1833            metadata: metadata.clone(),
1834            requests: Default::default(),
1835        };
1836
1837        let requests = async_reader.requests.clone();
1838        let (_, fields) = parquet_to_arrow_schema_and_fields(
1839            metadata.file_metadata().schema_descr(),
1840            ProjectionMask::all(),
1841            None,
1842        )
1843        .unwrap();
1844
1845        let _schema_desc = metadata.file_metadata().schema_descr();
1846
1847        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1848
1849        let reader_factory = ReaderFactory {
1850            metadata,
1851            fields: fields.map(Arc::new),
1852            input: async_reader,
1853            filter: None,
1854            limit: None,
1855            offset: None,
1856        };
1857
1858        let mut skip = true;
1859        let mut pages = offset_index[0].page_locations.iter().peekable();
1860
1861        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1862        let mut selectors = vec![];
1863        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1864        while let Some(page) = pages.next() {
1865            let num_rows = if let Some(next_page) = pages.peek() {
1866                next_page.first_row_index - page.first_row_index
1867            } else {
1868                num_rows - page.first_row_index
1869            };
1870
1871            if skip {
1872                selectors.push(RowSelector::skip(num_rows as usize));
1873            } else {
1874                selectors.push(RowSelector::select(num_rows as usize));
1875                let start = page.offset as usize;
1876                let end = start + page.compressed_page_size as usize;
1877                expected_page_requests.push(start..end);
1878            }
1879            skip = !skip;
1880        }
1881
1882        let selection = RowSelection::from(selectors);
1883
1884        let (_factory, _reader) = reader_factory
1885            .read_row_group(0, Some(selection), projection.clone(), 48)
1886            .await
1887            .expect("reading row group");
1888
1889        let requests = requests.lock().unwrap();
1890
1891        assert_eq!(&requests[..], &expected_page_requests)
1892    }
1893
1894    #[tokio::test]
1895    async fn test_batch_size_overallocate() {
1896        let testdata = arrow::util::test_util::parquet_test_data();
1897        // `alltypes_plain.parquet` only have 8 rows
1898        let path = format!("{testdata}/alltypes_plain.parquet");
1899        let data = Bytes::from(std::fs::read(path).unwrap());
1900
1901        let metadata = ParquetMetaDataReader::new()
1902            .parse_and_finish(&data)
1903            .unwrap();
1904        let file_rows = metadata.file_metadata().num_rows() as usize;
1905        let metadata = Arc::new(metadata);
1906
1907        let async_reader = TestReader {
1908            data: data.clone(),
1909            metadata: metadata.clone(),
1910            requests: Default::default(),
1911        };
1912
1913        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1914            .await
1915            .unwrap();
1916
1917        let stream = builder
1918            .with_projection(ProjectionMask::all())
1919            .with_batch_size(1024)
1920            .build()
1921            .unwrap();
1922        assert_ne!(1024, file_rows);
1923        assert_eq!(stream.batch_size, file_rows);
1924    }
1925
1926    #[tokio::test]
1927    async fn test_get_row_group_column_bloom_filter_without_length() {
1928        let testdata = arrow::util::test_util::parquet_test_data();
1929        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1930        let data = Bytes::from(std::fs::read(path).unwrap());
1931        test_get_row_group_column_bloom_filter(data, false).await;
1932    }
1933
1934    #[tokio::test]
1935    async fn test_parquet_record_batch_stream_schema() {
1936        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1937            schema.flattened_fields().iter().map(|f| f.name()).collect()
1938        }
1939
1940        // ParquetRecordBatchReaderBuilder::schema differs from
1941        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1942        // schema contents (in terms of custom metadata attached to schema, and fields
1943        // returned). Test to ensure this remains consistent behaviour.
1944        //
1945        // Ensure same for asynchronous versions of the above.
1946
1947        // Prep data, for a schema with nested fields, with custom metadata
1948        let mut metadata = HashMap::with_capacity(1);
1949        metadata.insert("key".to_string(), "value".to_string());
1950
1951        let nested_struct_array = StructArray::from(vec![
1952            (
1953                Arc::new(Field::new("d", DataType::Utf8, true)),
1954                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1955            ),
1956            (
1957                Arc::new(Field::new("e", DataType::Utf8, true)),
1958                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1959            ),
1960        ]);
1961        let struct_array = StructArray::from(vec![
1962            (
1963                Arc::new(Field::new("a", DataType::Int32, true)),
1964                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1965            ),
1966            (
1967                Arc::new(Field::new("b", DataType::UInt64, true)),
1968                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1969            ),
1970            (
1971                Arc::new(Field::new(
1972                    "c",
1973                    nested_struct_array.data_type().clone(),
1974                    true,
1975                )),
1976                Arc::new(nested_struct_array) as ArrayRef,
1977            ),
1978        ]);
1979
1980        let schema =
1981            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1982        let record_batch = RecordBatch::from(struct_array)
1983            .with_schema(schema.clone())
1984            .unwrap();
1985
1986        // Write parquet with custom metadata in schema
1987        let mut file = tempfile().unwrap();
1988        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1989        writer.write(&record_batch).unwrap();
1990        writer.close().unwrap();
1991
1992        let all_fields = ["a", "b", "c", "d", "e"];
1993        // (leaf indices in mask, expected names in output schema all fields)
1994        let projections = [
1995            (vec![], vec![]),
1996            (vec![0], vec!["a"]),
1997            (vec![0, 1], vec!["a", "b"]),
1998            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1999            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2000        ];
2001
2002        // Ensure we're consistent for each of these projections
2003        for (indices, expected_projected_names) in projections {
2004            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2005                // Builder schema should preserve all fields and metadata
2006                assert_eq!(get_all_field_names(&builder), all_fields);
2007                assert_eq!(builder.metadata, metadata);
2008                // Reader & batch schema should show only projected fields, and no metadata
2009                assert_eq!(get_all_field_names(&reader), expected_projected_names);
2010                assert_eq!(reader.metadata, HashMap::default());
2011                assert_eq!(get_all_field_names(&batch), expected_projected_names);
2012                assert_eq!(batch.metadata, HashMap::default());
2013            };
2014
2015            let builder =
2016                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2017            let sync_builder_schema = builder.schema().clone();
2018            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2019            let mut reader = builder.with_projection(mask).build().unwrap();
2020            let sync_reader_schema = reader.schema();
2021            let batch = reader.next().unwrap().unwrap();
2022            let sync_batch_schema = batch.schema();
2023            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2024
2025            // asynchronous should be same
2026            let file = tokio::fs::File::from(file.try_clone().unwrap());
2027            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2028            let async_builder_schema = builder.schema().clone();
2029            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2030            let mut reader = builder.with_projection(mask).build().unwrap();
2031            let async_reader_schema = reader.schema().clone();
2032            let batch = reader.next().await.unwrap().unwrap();
2033            let async_batch_schema = batch.schema();
2034            assert_schemas(
2035                async_builder_schema,
2036                async_reader_schema,
2037                async_batch_schema,
2038            );
2039        }
2040    }
2041
2042    #[tokio::test]
2043    async fn test_get_row_group_column_bloom_filter_with_length() {
2044        // convert to new parquet file with bloom_filter_length
2045        let testdata = arrow::util::test_util::parquet_test_data();
2046        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2047        let data = Bytes::from(std::fs::read(path).unwrap());
2048        let metadata = ParquetMetaDataReader::new()
2049            .parse_and_finish(&data)
2050            .unwrap();
2051        let metadata = Arc::new(metadata);
2052        let async_reader = TestReader {
2053            data: data.clone(),
2054            metadata: metadata.clone(),
2055            requests: Default::default(),
2056        };
2057        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2058            .await
2059            .unwrap();
2060        let schema = builder.schema().clone();
2061        let stream = builder.build().unwrap();
2062        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2063
2064        let mut parquet_data = Vec::new();
2065        let props = WriterProperties::builder()
2066            .set_bloom_filter_enabled(true)
2067            .build();
2068        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2069        for batch in batches {
2070            writer.write(&batch).unwrap();
2071        }
2072        writer.close().unwrap();
2073
2074        // test the new parquet file
2075        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2076    }
2077
2078    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2079        let metadata = ParquetMetaDataReader::new()
2080            .parse_and_finish(&data)
2081            .unwrap();
2082        let metadata = Arc::new(metadata);
2083
2084        assert_eq!(metadata.num_row_groups(), 1);
2085        let row_group = metadata.row_group(0);
2086        let column = row_group.column(0);
2087        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2088
2089        let async_reader = TestReader {
2090            data: data.clone(),
2091            metadata: metadata.clone(),
2092            requests: Default::default(),
2093        };
2094
2095        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2096            .await
2097            .unwrap();
2098
2099        let sbbf = builder
2100            .get_row_group_column_bloom_filter(0, 0)
2101            .await
2102            .unwrap()
2103            .unwrap();
2104        assert!(sbbf.check(&"Hello"));
2105        assert!(!sbbf.check(&"Hello_Not_Exists"));
2106    }
2107
2108    #[tokio::test]
2109    async fn test_nested_skip() {
2110        let schema = Arc::new(Schema::new(vec![
2111            Field::new("col_1", DataType::UInt64, false),
2112            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2113        ]));
2114
2115        // Default writer properties
2116        let props = WriterProperties::builder()
2117            .set_data_page_row_count_limit(256)
2118            .set_write_batch_size(256)
2119            .set_max_row_group_size(1024);
2120
2121        // Write data
2122        let mut file = tempfile().unwrap();
2123        let mut writer =
2124            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2125
2126        let mut builder = ListBuilder::new(StringBuilder::new());
2127        for id in 0..1024 {
2128            match id % 3 {
2129                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2130                1 => builder.append_value([Some(format!("id_{id}"))]),
2131                _ => builder.append_null(),
2132            }
2133        }
2134        let refs = vec![
2135            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2136            Arc::new(builder.finish()) as ArrayRef,
2137        ];
2138
2139        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2140        writer.write(&batch).unwrap();
2141        writer.close().unwrap();
2142
2143        let selections = [
2144            RowSelection::from(vec![
2145                RowSelector::skip(313),
2146                RowSelector::select(1),
2147                RowSelector::skip(709),
2148                RowSelector::select(1),
2149            ]),
2150            RowSelection::from(vec![
2151                RowSelector::skip(255),
2152                RowSelector::select(1),
2153                RowSelector::skip(767),
2154                RowSelector::select(1),
2155            ]),
2156            RowSelection::from(vec![
2157                RowSelector::select(255),
2158                RowSelector::skip(1),
2159                RowSelector::select(767),
2160                RowSelector::skip(1),
2161            ]),
2162            RowSelection::from(vec![
2163                RowSelector::skip(254),
2164                RowSelector::select(1),
2165                RowSelector::select(1),
2166                RowSelector::skip(767),
2167                RowSelector::select(1),
2168            ]),
2169        ];
2170
2171        for selection in selections {
2172            let expected = selection.row_count();
2173            // Read data
2174            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2175                tokio::fs::File::from_std(file.try_clone().unwrap()),
2176                ArrowReaderOptions::new().with_page_index(true),
2177            )
2178            .await
2179            .unwrap();
2180
2181            reader = reader.with_row_selection(selection);
2182
2183            let mut stream = reader.build().unwrap();
2184
2185            let mut total_rows = 0;
2186            while let Some(rb) = stream.next().await {
2187                let rb = rb.unwrap();
2188                total_rows += rb.num_rows();
2189            }
2190            assert_eq!(total_rows, expected);
2191        }
2192    }
2193
2194    #[tokio::test]
2195    async fn test_row_filter_nested() {
2196        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2197        let b = StructArray::from(vec![
2198            (
2199                Arc::new(Field::new("aa", DataType::Utf8, true)),
2200                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2201            ),
2202            (
2203                Arc::new(Field::new("bb", DataType::Utf8, true)),
2204                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2205            ),
2206        ]);
2207        let c = Int32Array::from_iter(0..6);
2208        let data = RecordBatch::try_from_iter([
2209            ("a", Arc::new(a) as ArrayRef),
2210            ("b", Arc::new(b) as ArrayRef),
2211            ("c", Arc::new(c) as ArrayRef),
2212        ])
2213        .unwrap();
2214
2215        let mut buf = Vec::with_capacity(1024);
2216        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2217        writer.write(&data).unwrap();
2218        writer.close().unwrap();
2219
2220        let data: Bytes = buf.into();
2221        let metadata = ParquetMetaDataReader::new()
2222            .parse_and_finish(&data)
2223            .unwrap();
2224        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2225
2226        let test = TestReader {
2227            data,
2228            metadata: Arc::new(metadata),
2229            requests: Default::default(),
2230        };
2231        let requests = test.requests.clone();
2232
2233        let a_scalar = StringArray::from_iter_values(["b"]);
2234        let a_filter = ArrowPredicateFn::new(
2235            ProjectionMask::leaves(&parquet_schema, vec![0]),
2236            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2237        );
2238
2239        let b_scalar = StringArray::from_iter_values(["4"]);
2240        let b_filter = ArrowPredicateFn::new(
2241            ProjectionMask::leaves(&parquet_schema, vec![2]),
2242            move |batch| {
2243                // Filter on the second element of the struct.
2244                let struct_array = batch
2245                    .column(0)
2246                    .as_any()
2247                    .downcast_ref::<StructArray>()
2248                    .unwrap();
2249                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2250            },
2251        );
2252
2253        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2254
2255        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2256        let stream = ParquetRecordBatchStreamBuilder::new(test)
2257            .await
2258            .unwrap()
2259            .with_projection(mask.clone())
2260            .with_batch_size(1024)
2261            .with_row_filter(filter)
2262            .build()
2263            .unwrap();
2264
2265        let batches: Vec<_> = stream.try_collect().await.unwrap();
2266        assert_eq!(batches.len(), 1);
2267
2268        let batch = &batches[0];
2269        assert_eq!(batch.num_rows(), 1);
2270        assert_eq!(batch.num_columns(), 2);
2271
2272        let col = batch.column(0);
2273        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2274        assert_eq!(val, "b");
2275
2276        let col = batch.column(1);
2277        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2278        assert_eq!(val, 3);
2279
2280        // Should only have made 3 requests
2281        assert_eq!(requests.lock().unwrap().len(), 3);
2282    }
2283
2284    #[tokio::test]
2285    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2286        use tokio::fs::File;
2287        let testdata = arrow::util::test_util::parquet_test_data();
2288        let path = format!("{testdata}/alltypes_plain.parquet");
2289        let mut file = File::open(&path).await.unwrap();
2290        let file_size = file.metadata().await.unwrap().len();
2291        let mut metadata = ParquetMetaDataReader::new()
2292            .with_page_indexes(true)
2293            .load_and_finish(&mut file, file_size as usize)
2294            .await
2295            .unwrap();
2296
2297        metadata.set_offset_index(Some(vec![]));
2298        let options = ArrowReaderOptions::new().with_page_index(true);
2299        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2300        let reader =
2301            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2302                .build()
2303                .unwrap();
2304
2305        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2306        assert_eq!(result.len(), 1);
2307    }
2308
2309    #[tokio::test]
2310    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2311        use tokio::fs::File;
2312        let testdata = arrow::util::test_util::parquet_test_data();
2313        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2314        let mut file = File::open(&path).await.unwrap();
2315        let file_size = file.metadata().await.unwrap().len();
2316        let metadata = ParquetMetaDataReader::new()
2317            .with_page_indexes(true)
2318            .load_and_finish(&mut file, file_size as usize)
2319            .await
2320            .unwrap();
2321
2322        let options = ArrowReaderOptions::new().with_page_index(true);
2323        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2324        let reader =
2325            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2326                .build()
2327                .unwrap();
2328
2329        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2330        assert_eq!(result.len(), 8);
2331    }
2332
2333    #[tokio::test]
2334    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2335        use tempfile::TempDir;
2336        use tokio::fs::File;
2337        fn write_metadata_to_local_file(
2338            metadata: ParquetMetaData,
2339            file: impl AsRef<std::path::Path>,
2340        ) {
2341            use crate::file::metadata::ParquetMetaDataWriter;
2342            use std::fs::File;
2343            let file = File::create(file).unwrap();
2344            ParquetMetaDataWriter::new(file, &metadata)
2345                .finish()
2346                .unwrap()
2347        }
2348
2349        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2350            use std::fs::File;
2351            let file = File::open(file).unwrap();
2352            ParquetMetaDataReader::new()
2353                .with_page_indexes(true)
2354                .parse_and_finish(&file)
2355                .unwrap()
2356        }
2357
2358        let testdata = arrow::util::test_util::parquet_test_data();
2359        let path = format!("{testdata}/alltypes_plain.parquet");
2360        let mut file = File::open(&path).await.unwrap();
2361        let file_size = file.metadata().await.unwrap().len();
2362        let metadata = ParquetMetaDataReader::new()
2363            .with_page_indexes(true)
2364            .load_and_finish(&mut file, file_size as usize)
2365            .await
2366            .unwrap();
2367
2368        let tempdir = TempDir::new().unwrap();
2369        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2370        write_metadata_to_local_file(metadata, &metadata_path);
2371        let metadata = read_metadata_from_local_file(&metadata_path);
2372
2373        let options = ArrowReaderOptions::new().with_page_index(true);
2374        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2375        let reader =
2376            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2377                .build()
2378                .unwrap();
2379
2380        // Panics here
2381        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2382        assert_eq!(result.len(), 1);
2383    }
2384}