parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetRecordBatchStreamBuilder`]:  `async` API for reading Parquet files as
19//! [`RecordBatch`]es
20//!
21//! This can be used to decode a Parquet file in streaming fashion (without
22//! downloading the whole file at once) from a remote source, such as an object store.
23//!
24//! See example on [`ParquetRecordBatchStreamBuilder::new`]
25
26use std::collections::VecDeque;
27use std::fmt::Formatter;
28use std::io::SeekFrom;
29use std::ops::Range;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34use bytes::{Buf, Bytes};
35use futures::future::{BoxFuture, FutureExt};
36use futures::ready;
37use futures::stream::Stream;
38use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
39
40use arrow_array::RecordBatch;
41use arrow_schema::{DataType, Fields, Schema, SchemaRef};
42
43use crate::arrow::array_reader::{build_array_reader, RowGroups};
44use crate::arrow::arrow_reader::{
45    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
46    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::file::FOOTER_SIZE;
59use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
60
61mod metadata;
62pub use metadata::*;
63
64#[cfg(feature = "object_store")]
65mod store;
66
67use crate::arrow::schema::ParquetField;
68#[cfg(feature = "object_store")]
69pub use store::*;
70
71/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
72///
73/// Notes:
74///
75/// 1. There is a default implementation for types that implement [`AsyncRead`]
76///    and [`AsyncSeek`], for example [`tokio::fs::File`].
77///
78/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
79///    is enabled, implements this interface for [`ObjectStore`].
80///
81/// [`ObjectStore`]: object_store::ObjectStore
82///
83/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
84pub trait AsyncFileReader: Send {
85    /// Retrieve the bytes in `range`
86    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
87
88    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
89    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
90        async move {
91            let mut result = Vec::with_capacity(ranges.len());
92
93            for range in ranges.into_iter() {
94                let data = self.get_bytes(range).await?;
95                result.push(data);
96            }
97
98            Ok(result)
99        }
100        .boxed()
101    }
102
103    /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
104    /// allowing fine-grained control over how metadata is sourced, in particular allowing
105    /// for caching, pre-fetching, catalog metadata, etc...
106    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
107}
108
109/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
110impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
111    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
112        self.as_mut().get_bytes(range)
113    }
114
115    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
116        self.as_mut().get_byte_ranges(ranges)
117    }
118
119    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
120        self.as_mut().get_metadata()
121    }
122}
123
124impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
125    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
126        async move {
127            self.seek(SeekFrom::Start(range.start as u64)).await?;
128
129            let to_read = range.end - range.start;
130            let mut buffer = Vec::with_capacity(to_read);
131            let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
132            if read != to_read {
133                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
134            }
135
136            Ok(buffer.into())
137        }
138        .boxed()
139    }
140
141    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
142        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
143        async move {
144            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
145
146            let mut buf = [0_u8; FOOTER_SIZE];
147            self.read_exact(&mut buf).await?;
148
149            let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
150            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
151                .await?;
152
153            let mut buf = Vec::with_capacity(metadata_len);
154            self.take(metadata_len as _).read_to_end(&mut buf).await?;
155
156            Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
157        }
158        .boxed()
159    }
160}
161
162impl ArrowReaderMetadata {
163    /// Returns a new [`ArrowReaderMetadata`] for this builder
164    ///
165    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
166    ///
167    /// # Notes
168    ///
169    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
170    /// `Self::metadata` is missing the page index, this function will attempt
171    /// to load the page index by making an object store request.
172    pub async fn load_async<T: AsyncFileReader>(
173        input: &mut T,
174        options: ArrowReaderOptions,
175    ) -> Result<Self> {
176        // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
177        // took an argument to fetch the page indexes.
178        let mut metadata = input.get_metadata().await?;
179
180        if options.page_index
181            && metadata.column_index().is_none()
182            && metadata.offset_index().is_none()
183        {
184            let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
185            let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
186            reader.load_page_index(input).await?;
187            metadata = Arc::new(reader.finish()?)
188        }
189        Self::try_new(metadata, options)
190    }
191}
192
193#[doc(hidden)]
194/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
195///
196/// Allows sharing the same builder for both the sync and async versions, whilst also not
197/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
198pub struct AsyncReader<T>(T);
199
200/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
201///
202/// This builder  handles reading the parquet file metadata, allowing consumers
203/// to use this information to select what specific columns, row groups, etc...
204/// they wish to be read by the resulting stream
205///
206/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
207///
208/// See [`ArrowReaderBuilder`] for additional member functions
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
210
211impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
212    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
213    /// specified source.
214    ///
215    /// # Example
216    /// ```
217    /// # #[tokio::main(flavor="current_thread")]
218    /// # async fn main() {
219    /// #
220    /// # use arrow_array::RecordBatch;
221    /// # use arrow::util::pretty::pretty_format_batches;
222    /// # use futures::TryStreamExt;
223    /// #
224    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
225    /// #
226    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
227    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
228    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
229    /// #     assert_eq!(
230    /// #          &actual_lines, expected_lines,
231    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
232    /// #          expected_lines, actual_lines
233    /// #      );
234    /// #  }
235    /// #
236    /// # let testdata = arrow::util::test_util::parquet_test_data();
237    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
238    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
239    /// // another async I/O reader such as a reader from an object store.
240    /// let file = tokio::fs::File::open(path).await.unwrap();
241    ///
242    /// // Configure options for reading from the async souce
243    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
244    ///     .await
245    ///     .unwrap();
246    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
247    /// // a stream that can be used to incrementally read the data in batches
248    /// let stream = builder.build().unwrap();
249    /// // In this example, we collect the stream into a Vec<RecordBatch>
250    /// // but real applications would likely process the batches as they are read
251    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
252    /// // Demonstrate the results are as expected
253    /// assert_batches_eq(
254    ///     &results,
255    ///     &[
256    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
257    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
258    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
259    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
260    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
261    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
262    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
263    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
264    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
265    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
266    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
267    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
268    ///      ],
269    ///  );
270    /// # }
271    /// ```
272    ///
273    /// # Example configuring options and reading metadata
274    ///
275    /// There are many options that control the behavior of the reader, such as
276    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
277    ///
278    /// ```
279    /// # #[tokio::main(flavor="current_thread")]
280    /// # async fn main() {
281    /// #
282    /// # use arrow_array::RecordBatch;
283    /// # use arrow::util::pretty::pretty_format_batches;
284    /// # use futures::TryStreamExt;
285    /// #
286    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
287    /// #
288    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
289    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
290    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
291    /// #     assert_eq!(
292    /// #          &actual_lines, expected_lines,
293    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
294    /// #          expected_lines, actual_lines
295    /// #      );
296    /// #  }
297    /// #
298    /// # let testdata = arrow::util::test_util::parquet_test_data();
299    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
300    /// // As before, use tokio::fs::File to read data using an async I/O.
301    /// let file = tokio::fs::File::open(path).await.unwrap();
302    ///
303    /// // Configure options for reading from the async source, in this case we set the batch size
304    /// // to 3 which produces 3 rows at a time.
305    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
306    ///     .await
307    ///     .unwrap()
308    ///     .with_batch_size(3);
309    ///
310    /// // We can also read the metadata to inspect the schema and other metadata
311    /// // before actually reading the data
312    /// let file_metadata = builder.metadata().file_metadata();
313    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
314    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
315    ///
316    /// let stream = builder.with_projection(mask).build().unwrap();
317    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
318    /// // Print out the results
319    /// assert_batches_eq(
320    ///     &results,
321    ///     &[
322    ///         "+----------+-------------+-----------+",
323    ///         "| bool_col | tinyint_col | float_col |",
324    ///         "+----------+-------------+-----------+",
325    ///         "| true     | 0           | 0.0       |",
326    ///         "| false    | 1           | 1.1       |",
327    ///         "| true     | 0           | 0.0       |",
328    ///         "| false    | 1           | 1.1       |",
329    ///         "| true     | 0           | 0.0       |",
330    ///         "| false    | 1           | 1.1       |",
331    ///         "| true     | 0           | 0.0       |",
332    ///         "| false    | 1           | 1.1       |",
333    ///         "+----------+-------------+-----------+",
334    ///      ],
335    ///  );
336    ///
337    /// // The results has 8 rows, so since we set the batch size to 3, we expect
338    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
339    /// assert_eq!(results.len(), 3);
340    /// # }
341    /// ```
342    pub async fn new(input: T) -> Result<Self> {
343        Self::new_with_options(input, Default::default()).await
344    }
345
346    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
347    /// and [`ArrowReaderOptions`]
348    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
349        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
350        Ok(Self::new_with_metadata(input, metadata))
351    }
352
353    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
354    ///
355    /// This allows loading metadata once and using it to create multiple builders with
356    /// potentially different settings, that can be read in parallel.
357    ///
358    /// # Example of reading from multiple streams in parallel
359    ///
360    /// ```
361    /// # use std::fs::metadata;
362    /// # use std::sync::Arc;
363    /// # use bytes::Bytes;
364    /// # use arrow_array::{Int32Array, RecordBatch};
365    /// # use arrow_schema::{DataType, Field, Schema};
366    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
367    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
368    /// # use tempfile::tempfile;
369    /// # use futures::StreamExt;
370    /// # #[tokio::main(flavor="current_thread")]
371    /// # async fn main() {
372    /// #
373    /// # let mut file = tempfile().unwrap();
374    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
375    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
376    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
377    /// # writer.write(&batch).unwrap();
378    /// # writer.close().unwrap();
379    /// // open file with parquet data
380    /// let mut file = tokio::fs::File::from_std(file);
381    /// // load metadata once
382    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
383    /// // create two readers, a and b, from the same underlying file
384    /// // without reading the metadata again
385    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
386    ///     file.try_clone().await.unwrap(),
387    ///     meta.clone()
388    /// ).build().unwrap();
389    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
390    ///
391    /// // Can read batches from both readers in parallel
392    /// assert_eq!(
393    ///   a.next().await.unwrap().unwrap(),
394    ///   b.next().await.unwrap().unwrap(),
395    /// );
396    /// # }
397    /// ```
398    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
399        Self::new_builder(AsyncReader(input), metadata)
400    }
401
402    /// Read bloom filter for a column in a row group
403    ///
404    /// Returns `None` if the column does not have a bloom filter
405    ///
406    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
407    pub async fn get_row_group_column_bloom_filter(
408        &mut self,
409        row_group_idx: usize,
410        column_idx: usize,
411    ) -> Result<Option<Sbbf>> {
412        let metadata = self.metadata.row_group(row_group_idx);
413        let column_metadata = metadata.column(column_idx);
414
415        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
416            offset
417                .try_into()
418                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
419        } else {
420            return Ok(None);
421        };
422
423        let buffer = match column_metadata.bloom_filter_length() {
424            Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
425            None => self
426                .input
427                .0
428                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
429        }
430        .await?;
431
432        let (header, bitset_offset) =
433            chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
434
435        match header.algorithm {
436            BloomFilterAlgorithm::BLOCK(_) => {
437                // this match exists to future proof the singleton algorithm enum
438            }
439        }
440        match header.compression {
441            BloomFilterCompression::UNCOMPRESSED(_) => {
442                // this match exists to future proof the singleton compression enum
443            }
444        }
445        match header.hash {
446            BloomFilterHash::XXHASH(_) => {
447                // this match exists to future proof the singleton hash enum
448            }
449        }
450
451        let bitset = match column_metadata.bloom_filter_length() {
452            Some(_) => buffer.slice((bitset_offset as usize - offset)..),
453            None => {
454                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
455                    ParquetError::General("Bloom filter length is invalid".to_string())
456                })?;
457                self.input
458                    .0
459                    .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
460                    .await?
461            }
462        };
463        Ok(Some(Sbbf::new(&bitset)))
464    }
465
466    /// Build a new [`ParquetRecordBatchStream`]
467    ///
468    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
469    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
470        let num_row_groups = self.metadata.row_groups().len();
471
472        let row_groups = match self.row_groups {
473            Some(row_groups) => {
474                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
475                    return Err(general_err!(
476                        "row group {} out of bounds 0..{}",
477                        col,
478                        num_row_groups
479                    ));
480                }
481                row_groups.into()
482            }
483            None => (0..self.metadata.row_groups().len()).collect(),
484        };
485
486        // Try to avoid allocate large buffer
487        let batch_size = self
488            .batch_size
489            .min(self.metadata.file_metadata().num_rows() as usize);
490        let reader = ReaderFactory {
491            input: self.input.0,
492            filter: self.filter,
493            metadata: self.metadata.clone(),
494            fields: self.fields,
495            limit: self.limit,
496            offset: self.offset,
497        };
498
499        // Ensure schema of ParquetRecordBatchStream respects projection, and does
500        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
501        let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
502            Some(DataType::Struct(fields)) => {
503                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
504            }
505            None => Fields::empty(),
506            _ => unreachable!("Must be Struct for root type"),
507        };
508        let schema = Arc::new(Schema::new(projected_fields));
509
510        Ok(ParquetRecordBatchStream {
511            metadata: self.metadata,
512            batch_size,
513            row_groups,
514            projection: self.projection,
515            selection: self.selection,
516            schema,
517            reader: Some(reader),
518            state: StreamState::Init,
519        })
520    }
521}
522
523type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
524
525/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
526/// [`ParquetRecordBatchReader`]
527struct ReaderFactory<T> {
528    metadata: Arc<ParquetMetaData>,
529
530    fields: Option<Arc<ParquetField>>,
531
532    input: T,
533
534    filter: Option<RowFilter>,
535
536    limit: Option<usize>,
537
538    offset: Option<usize>,
539}
540
541impl<T> ReaderFactory<T>
542where
543    T: AsyncFileReader + Send,
544{
545    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
546    ///
547    /// Note: this captures self so that the resulting future has a static lifetime
548    async fn read_row_group(
549        mut self,
550        row_group_idx: usize,
551        mut selection: Option<RowSelection>,
552        projection: ProjectionMask,
553        batch_size: usize,
554    ) -> ReadResult<T> {
555        // TODO: calling build_array multiple times is wasteful
556
557        let meta = self.metadata.row_group(row_group_idx);
558        let offset_index = self
559            .metadata
560            .offset_index()
561            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
562            .filter(|index| !index.is_empty())
563            .map(|x| x[row_group_idx].as_slice());
564
565        let mut row_group = InMemoryRowGroup {
566            metadata: meta,
567            // schema: meta.schema_descr_ptr(),
568            row_count: meta.num_rows() as usize,
569            column_chunks: vec![None; meta.columns().len()],
570            offset_index,
571        };
572
573        if let Some(filter) = self.filter.as_mut() {
574            for predicate in filter.predicates.iter_mut() {
575                if !selects_any(selection.as_ref()) {
576                    return Ok((self, None));
577                }
578
579                let predicate_projection = predicate.projection();
580                row_group
581                    .fetch(&mut self.input, predicate_projection, selection.as_ref())
582                    .await?;
583
584                let array_reader =
585                    build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
586
587                selection = Some(evaluate_predicate(
588                    batch_size,
589                    array_reader,
590                    selection,
591                    predicate.as_mut(),
592                )?);
593            }
594        }
595
596        // Compute the number of rows in the selection before applying limit and offset
597        let rows_before = selection
598            .as_ref()
599            .map(|s| s.row_count())
600            .unwrap_or(row_group.row_count);
601
602        if rows_before == 0 {
603            return Ok((self, None));
604        }
605
606        selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
607
608        // Compute the number of rows in the selection after applying limit and offset
609        let rows_after = selection
610            .as_ref()
611            .map(|s| s.row_count())
612            .unwrap_or(row_group.row_count);
613
614        // Update offset if necessary
615        if let Some(offset) = &mut self.offset {
616            // Reduction is either because of offset or limit, as limit is applied
617            // after offset has been "exhausted" can just use saturating sub here
618            *offset = offset.saturating_sub(rows_before - rows_after)
619        }
620
621        if rows_after == 0 {
622            return Ok((self, None));
623        }
624
625        if let Some(limit) = &mut self.limit {
626            *limit -= rows_after;
627        }
628
629        row_group
630            .fetch(&mut self.input, &projection, selection.as_ref())
631            .await?;
632
633        let reader = ParquetRecordBatchReader::new(
634            batch_size,
635            build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
636            selection,
637        );
638
639        Ok((self, Some(reader)))
640    }
641}
642
643enum StreamState<T> {
644    /// At the start of a new row group, or the end of the parquet stream
645    Init,
646    /// Decoding a batch
647    Decoding(ParquetRecordBatchReader),
648    /// Reading data from input
649    Reading(BoxFuture<'static, ReadResult<T>>),
650    /// Error
651    Error,
652}
653
654impl<T> std::fmt::Debug for StreamState<T> {
655    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
656        match self {
657            StreamState::Init => write!(f, "StreamState::Init"),
658            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
659            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
660            StreamState::Error => write!(f, "StreamState::Error"),
661        }
662    }
663}
664
665/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
666///
667/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
668/// allowing users to decode record batches separately from I/O.
669///
670/// # I/O Buffering
671///
672/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
673/// (projection + filtering, etc) and decodes the rows from those buffered pages.
674///
675/// For example, if all rows and columns are selected, the entire row group is
676/// buffered in memory during decode. This minimizes the number of IO operations
677/// required, which is especially important for object stores, where IO operations
678/// have latencies in the hundreds of milliseconds
679///
680///
681/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
682pub struct ParquetRecordBatchStream<T> {
683    metadata: Arc<ParquetMetaData>,
684
685    schema: SchemaRef,
686
687    row_groups: VecDeque<usize>,
688
689    projection: ProjectionMask,
690
691    batch_size: usize,
692
693    selection: Option<RowSelection>,
694
695    /// This is an option so it can be moved into a future
696    reader: Option<ReaderFactory<T>>,
697
698    state: StreamState<T>,
699}
700
701impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
702    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
703        f.debug_struct("ParquetRecordBatchStream")
704            .field("metadata", &self.metadata)
705            .field("schema", &self.schema)
706            .field("batch_size", &self.batch_size)
707            .field("projection", &self.projection)
708            .field("state", &self.state)
709            .finish()
710    }
711}
712
713impl<T> ParquetRecordBatchStream<T> {
714    /// Returns the projected [`SchemaRef`] for reading the parquet file.
715    ///
716    /// Note that the schema metadata will be stripped here. See
717    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
718    pub fn schema(&self) -> &SchemaRef {
719        &self.schema
720    }
721}
722
723impl<T> ParquetRecordBatchStream<T>
724where
725    T: AsyncFileReader + Unpin + Send + 'static,
726{
727    /// Fetches the next row group from the stream.
728    ///
729    /// Users can continue to call this function to get row groups and decode them concurrently.
730    ///
731    /// ## Notes
732    ///
733    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
734    ///
735    /// ## Returns
736    ///
737    /// - `Ok(None)` if the stream has ended.
738    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
739    /// - `Ok(Some(reader))` which holds all the data for the row group.
740    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
741        loop {
742            match &mut self.state {
743                StreamState::Decoding(_) | StreamState::Reading(_) => {
744                    return Err(ParquetError::General(
745                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
746                    ))
747                }
748                StreamState::Init => {
749                    let row_group_idx = match self.row_groups.pop_front() {
750                        Some(idx) => idx,
751                        None => return Ok(None),
752                    };
753
754                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
755
756                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
757
758                    let reader_factory = self.reader.take().expect("lost reader");
759
760                    let (reader_factory, maybe_reader) = reader_factory
761                        .read_row_group(
762                            row_group_idx,
763                            selection,
764                            self.projection.clone(),
765                            self.batch_size,
766                        )
767                        .await
768                        .map_err(|err| {
769                            self.state = StreamState::Error;
770                            err
771                        })?;
772                    self.reader = Some(reader_factory);
773
774                    if let Some(reader) = maybe_reader {
775                        return Ok(Some(reader));
776                    } else {
777                        // All rows skipped, read next row group
778                        continue;
779                    }
780                }
781                StreamState::Error => return Ok(None), // Ends the stream as error happens.
782            }
783        }
784    }
785}
786
787impl<T> Stream for ParquetRecordBatchStream<T>
788where
789    T: AsyncFileReader + Unpin + Send + 'static,
790{
791    type Item = Result<RecordBatch>;
792
793    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
794        loop {
795            match &mut self.state {
796                StreamState::Decoding(batch_reader) => match batch_reader.next() {
797                    Some(Ok(batch)) => {
798                        return Poll::Ready(Some(Ok(batch)));
799                    }
800                    Some(Err(e)) => {
801                        self.state = StreamState::Error;
802                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
803                    }
804                    None => self.state = StreamState::Init,
805                },
806                StreamState::Init => {
807                    let row_group_idx = match self.row_groups.pop_front() {
808                        Some(idx) => idx,
809                        None => return Poll::Ready(None),
810                    };
811
812                    let reader = self.reader.take().expect("lost reader");
813
814                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
815
816                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
817
818                    let fut = reader
819                        .read_row_group(
820                            row_group_idx,
821                            selection,
822                            self.projection.clone(),
823                            self.batch_size,
824                        )
825                        .boxed();
826
827                    self.state = StreamState::Reading(fut)
828                }
829                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
830                    Ok((reader_factory, maybe_reader)) => {
831                        self.reader = Some(reader_factory);
832                        match maybe_reader {
833                            // Read records from [`ParquetRecordBatchReader`]
834                            Some(reader) => self.state = StreamState::Decoding(reader),
835                            // All rows skipped, read next row group
836                            None => self.state = StreamState::Init,
837                        }
838                    }
839                    Err(e) => {
840                        self.state = StreamState::Error;
841                        return Poll::Ready(Some(Err(e)));
842                    }
843                },
844                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
845            }
846        }
847    }
848}
849
850/// An in-memory collection of column chunks
851struct InMemoryRowGroup<'a> {
852    metadata: &'a RowGroupMetaData,
853    offset_index: Option<&'a [OffsetIndexMetaData]>,
854    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
855    row_count: usize,
856}
857
858impl InMemoryRowGroup<'_> {
859    /// Fetches the necessary column data into memory
860    async fn fetch<T: AsyncFileReader + Send>(
861        &mut self,
862        input: &mut T,
863        projection: &ProjectionMask,
864        selection: Option<&RowSelection>,
865    ) -> Result<()> {
866        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
867            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
868            // `RowSelection`
869            let mut page_start_offsets: Vec<Vec<usize>> = vec![];
870
871            let fetch_ranges = self
872                .column_chunks
873                .iter()
874                .zip(self.metadata.columns())
875                .enumerate()
876                .filter(|&(idx, (chunk, _chunk_meta))| {
877                    chunk.is_none() && projection.leaf_included(idx)
878                })
879                .flat_map(|(idx, (_chunk, chunk_meta))| {
880                    // If the first page does not start at the beginning of the column,
881                    // then we need to also fetch a dictionary page.
882                    let mut ranges = vec![];
883                    let (start, _len) = chunk_meta.byte_range();
884                    match offset_index[idx].page_locations.first() {
885                        Some(first) if first.offset as u64 != start => {
886                            ranges.push(start as usize..first.offset as usize);
887                        }
888                        _ => (),
889                    }
890
891                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
892                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
893
894                    ranges
895                })
896                .collect();
897
898            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
899            let mut page_start_offsets = page_start_offsets.into_iter();
900
901            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
902                if chunk.is_some() || !projection.leaf_included(idx) {
903                    continue;
904                }
905
906                if let Some(offsets) = page_start_offsets.next() {
907                    let mut chunks = Vec::with_capacity(offsets.len());
908                    for _ in 0..offsets.len() {
909                        chunks.push(chunk_data.next().unwrap());
910                    }
911
912                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
913                        length: self.metadata.column(idx).byte_range().1 as usize,
914                        data: offsets.into_iter().zip(chunks.into_iter()).collect(),
915                    }))
916                }
917            }
918        } else {
919            let fetch_ranges = self
920                .column_chunks
921                .iter()
922                .enumerate()
923                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
924                .map(|(idx, _chunk)| {
925                    let column = self.metadata.column(idx);
926                    let (start, length) = column.byte_range();
927                    start as usize..(start + length) as usize
928                })
929                .collect();
930
931            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
932
933            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
934                if chunk.is_some() || !projection.leaf_included(idx) {
935                    continue;
936                }
937
938                if let Some(data) = chunk_data.next() {
939                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
940                        offset: self.metadata.column(idx).byte_range().0 as usize,
941                        data,
942                    }));
943                }
944            }
945        }
946
947        Ok(())
948    }
949}
950
951impl RowGroups for InMemoryRowGroup<'_> {
952    fn num_rows(&self) -> usize {
953        self.row_count
954    }
955
956    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
957        match &self.column_chunks[i] {
958            None => Err(ParquetError::General(format!(
959                "Invalid column index {i}, column was not fetched"
960            ))),
961            Some(data) => {
962                let page_locations = self
963                    .offset_index
964                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
965                    .filter(|index| !index.is_empty())
966                    .map(|index| index[i].page_locations.clone());
967                let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
968                    data.clone(),
969                    self.metadata.column(i),
970                    self.row_count,
971                    page_locations,
972                )?);
973
974                Ok(Box::new(ColumnChunkIterator {
975                    reader: Some(Ok(page_reader)),
976                }))
977            }
978        }
979    }
980}
981
982/// An in-memory column chunk
983#[derive(Clone)]
984enum ColumnChunkData {
985    /// Column chunk data representing only a subset of data pages
986    Sparse {
987        /// Length of the full column chunk
988        length: usize,
989        /// Set of data pages included in this sparse chunk. Each element is a tuple
990        /// of (page offset, page data)
991        data: Vec<(usize, Bytes)>,
992    },
993    /// Full column chunk and its offset
994    Dense { offset: usize, data: Bytes },
995}
996
997impl ColumnChunkData {
998    fn get(&self, start: u64) -> Result<Bytes> {
999        match &self {
1000            ColumnChunkData::Sparse { data, .. } => data
1001                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1002                .map(|idx| data[idx].1.clone())
1003                .map_err(|_| {
1004                    ParquetError::General(format!(
1005                        "Invalid offset in sparse column chunk data: {start}"
1006                    ))
1007                }),
1008            ColumnChunkData::Dense { offset, data } => {
1009                let start = start as usize - *offset;
1010                Ok(data.slice(start..))
1011            }
1012        }
1013    }
1014}
1015
1016impl Length for ColumnChunkData {
1017    fn len(&self) -> u64 {
1018        match &self {
1019            ColumnChunkData::Sparse { length, .. } => *length as u64,
1020            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1021        }
1022    }
1023}
1024
1025impl ChunkReader for ColumnChunkData {
1026    type T = bytes::buf::Reader<Bytes>;
1027
1028    fn get_read(&self, start: u64) -> Result<Self::T> {
1029        Ok(self.get(start)?.reader())
1030    }
1031
1032    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1033        Ok(self.get(start)?.slice(..length))
1034    }
1035}
1036
1037/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1038struct ColumnChunkIterator {
1039    reader: Option<Result<Box<dyn PageReader>>>,
1040}
1041
1042impl Iterator for ColumnChunkIterator {
1043    type Item = Result<Box<dyn PageReader>>;
1044
1045    fn next(&mut self) -> Option<Self::Item> {
1046        self.reader.take()
1047    }
1048}
1049
1050impl PageIterator for ColumnChunkIterator {}
1051
1052#[cfg(test)]
1053mod tests {
1054    use super::*;
1055    use crate::arrow::arrow_reader::{
1056        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1057    };
1058    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1059    use crate::arrow::ArrowWriter;
1060    use crate::file::metadata::ParquetMetaDataReader;
1061    use crate::file::properties::WriterProperties;
1062    use arrow::compute::kernels::cmp::eq;
1063    use arrow::error::Result as ArrowResult;
1064    use arrow_array::builder::{ListBuilder, StringBuilder};
1065    use arrow_array::cast::AsArray;
1066    use arrow_array::types::Int32Type;
1067    use arrow_array::{
1068        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1069        StructArray, UInt64Array,
1070    };
1071    use arrow_schema::{DataType, Field, Schema};
1072    use futures::{StreamExt, TryStreamExt};
1073    use rand::{thread_rng, Rng};
1074    use std::collections::HashMap;
1075    use std::sync::{Arc, Mutex};
1076    use tempfile::tempfile;
1077
1078    #[derive(Clone)]
1079    struct TestReader {
1080        data: Bytes,
1081        metadata: Arc<ParquetMetaData>,
1082        requests: Arc<Mutex<Vec<Range<usize>>>>,
1083    }
1084
1085    impl AsyncFileReader for TestReader {
1086        fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1087            self.requests.lock().unwrap().push(range.clone());
1088            futures::future::ready(Ok(self.data.slice(range))).boxed()
1089        }
1090
1091        fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
1092            futures::future::ready(Ok(self.metadata.clone())).boxed()
1093        }
1094    }
1095
1096    #[tokio::test]
1097    async fn test_async_reader() {
1098        let testdata = arrow::util::test_util::parquet_test_data();
1099        let path = format!("{testdata}/alltypes_plain.parquet");
1100        let data = Bytes::from(std::fs::read(path).unwrap());
1101
1102        let metadata = ParquetMetaDataReader::new()
1103            .parse_and_finish(&data)
1104            .unwrap();
1105        let metadata = Arc::new(metadata);
1106
1107        assert_eq!(metadata.num_row_groups(), 1);
1108
1109        let async_reader = TestReader {
1110            data: data.clone(),
1111            metadata: metadata.clone(),
1112            requests: Default::default(),
1113        };
1114
1115        let requests = async_reader.requests.clone();
1116        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1117            .await
1118            .unwrap();
1119
1120        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1121        let stream = builder
1122            .with_projection(mask.clone())
1123            .with_batch_size(1024)
1124            .build()
1125            .unwrap();
1126
1127        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1128
1129        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1130            .unwrap()
1131            .with_projection(mask)
1132            .with_batch_size(104)
1133            .build()
1134            .unwrap()
1135            .collect::<ArrowResult<Vec<_>>>()
1136            .unwrap();
1137
1138        assert_eq!(async_batches, sync_batches);
1139
1140        let requests = requests.lock().unwrap();
1141        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1142        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1143
1144        assert_eq!(
1145            &requests[..],
1146            &[
1147                offset_1 as usize..(offset_1 + length_1) as usize,
1148                offset_2 as usize..(offset_2 + length_2) as usize
1149            ]
1150        );
1151    }
1152
1153    #[tokio::test]
1154    async fn test_async_reader_with_next_row_group() {
1155        let testdata = arrow::util::test_util::parquet_test_data();
1156        let path = format!("{testdata}/alltypes_plain.parquet");
1157        let data = Bytes::from(std::fs::read(path).unwrap());
1158
1159        let metadata = ParquetMetaDataReader::new()
1160            .parse_and_finish(&data)
1161            .unwrap();
1162        let metadata = Arc::new(metadata);
1163
1164        assert_eq!(metadata.num_row_groups(), 1);
1165
1166        let async_reader = TestReader {
1167            data: data.clone(),
1168            metadata: metadata.clone(),
1169            requests: Default::default(),
1170        };
1171
1172        let requests = async_reader.requests.clone();
1173        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1174            .await
1175            .unwrap();
1176
1177        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1178        let mut stream = builder
1179            .with_projection(mask.clone())
1180            .with_batch_size(1024)
1181            .build()
1182            .unwrap();
1183
1184        let mut readers = vec![];
1185        while let Some(reader) = stream.next_row_group().await.unwrap() {
1186            readers.push(reader);
1187        }
1188
1189        let async_batches: Vec<_> = readers
1190            .into_iter()
1191            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1192            .collect();
1193
1194        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1195            .unwrap()
1196            .with_projection(mask)
1197            .with_batch_size(104)
1198            .build()
1199            .unwrap()
1200            .collect::<ArrowResult<Vec<_>>>()
1201            .unwrap();
1202
1203        assert_eq!(async_batches, sync_batches);
1204
1205        let requests = requests.lock().unwrap();
1206        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1207        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1208
1209        assert_eq!(
1210            &requests[..],
1211            &[
1212                offset_1 as usize..(offset_1 + length_1) as usize,
1213                offset_2 as usize..(offset_2 + length_2) as usize
1214            ]
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn test_async_reader_with_index() {
1220        let testdata = arrow::util::test_util::parquet_test_data();
1221        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1222        let data = Bytes::from(std::fs::read(path).unwrap());
1223
1224        let metadata = ParquetMetaDataReader::new()
1225            .parse_and_finish(&data)
1226            .unwrap();
1227        let metadata = Arc::new(metadata);
1228
1229        assert_eq!(metadata.num_row_groups(), 1);
1230
1231        let async_reader = TestReader {
1232            data: data.clone(),
1233            metadata: metadata.clone(),
1234            requests: Default::default(),
1235        };
1236
1237        let options = ArrowReaderOptions::new().with_page_index(true);
1238        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1239            .await
1240            .unwrap();
1241
1242        // The builder should have page and offset indexes loaded now
1243        let metadata_with_index = builder.metadata();
1244
1245        // Check offset indexes are present for all columns
1246        let offset_index = metadata_with_index.offset_index().unwrap();
1247        let column_index = metadata_with_index.column_index().unwrap();
1248
1249        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1250        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1251
1252        let num_columns = metadata_with_index
1253            .file_metadata()
1254            .schema_descr()
1255            .num_columns();
1256
1257        // Check page indexes are present for all columns
1258        offset_index
1259            .iter()
1260            .for_each(|x| assert_eq!(x.len(), num_columns));
1261        column_index
1262            .iter()
1263            .for_each(|x| assert_eq!(x.len(), num_columns));
1264
1265        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1266        let stream = builder
1267            .with_projection(mask.clone())
1268            .with_batch_size(1024)
1269            .build()
1270            .unwrap();
1271
1272        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1273
1274        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1275            .unwrap()
1276            .with_projection(mask)
1277            .with_batch_size(1024)
1278            .build()
1279            .unwrap()
1280            .collect::<ArrowResult<Vec<_>>>()
1281            .unwrap();
1282
1283        assert_eq!(async_batches, sync_batches);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_async_reader_with_limit() {
1288        let testdata = arrow::util::test_util::parquet_test_data();
1289        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1290        let data = Bytes::from(std::fs::read(path).unwrap());
1291
1292        let metadata = ParquetMetaDataReader::new()
1293            .parse_and_finish(&data)
1294            .unwrap();
1295        let metadata = Arc::new(metadata);
1296
1297        assert_eq!(metadata.num_row_groups(), 1);
1298
1299        let async_reader = TestReader {
1300            data: data.clone(),
1301            metadata: metadata.clone(),
1302            requests: Default::default(),
1303        };
1304
1305        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1306            .await
1307            .unwrap();
1308
1309        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1310        let stream = builder
1311            .with_projection(mask.clone())
1312            .with_batch_size(1024)
1313            .with_limit(1)
1314            .build()
1315            .unwrap();
1316
1317        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1318
1319        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1320            .unwrap()
1321            .with_projection(mask)
1322            .with_batch_size(1024)
1323            .with_limit(1)
1324            .build()
1325            .unwrap()
1326            .collect::<ArrowResult<Vec<_>>>()
1327            .unwrap();
1328
1329        assert_eq!(async_batches, sync_batches);
1330    }
1331
1332    #[tokio::test]
1333    async fn test_async_reader_skip_pages() {
1334        let testdata = arrow::util::test_util::parquet_test_data();
1335        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1336        let data = Bytes::from(std::fs::read(path).unwrap());
1337
1338        let metadata = ParquetMetaDataReader::new()
1339            .parse_and_finish(&data)
1340            .unwrap();
1341        let metadata = Arc::new(metadata);
1342
1343        assert_eq!(metadata.num_row_groups(), 1);
1344
1345        let async_reader = TestReader {
1346            data: data.clone(),
1347            metadata: metadata.clone(),
1348            requests: Default::default(),
1349        };
1350
1351        let options = ArrowReaderOptions::new().with_page_index(true);
1352        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1353            .await
1354            .unwrap();
1355
1356        let selection = RowSelection::from(vec![
1357            RowSelector::skip(21),   // Skip first page
1358            RowSelector::select(21), // Select page to boundary
1359            RowSelector::skip(41),   // Skip multiple pages
1360            RowSelector::select(41), // Select multiple pages
1361            RowSelector::skip(25),   // Skip page across boundary
1362            RowSelector::select(25), // Select across page boundary
1363            RowSelector::skip(7116), // Skip to final page boundary
1364            RowSelector::select(10), // Select final page
1365        ]);
1366
1367        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1368
1369        let stream = builder
1370            .with_projection(mask.clone())
1371            .with_row_selection(selection.clone())
1372            .build()
1373            .expect("building stream");
1374
1375        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1376
1377        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1378            .unwrap()
1379            .with_projection(mask)
1380            .with_batch_size(1024)
1381            .with_row_selection(selection)
1382            .build()
1383            .unwrap()
1384            .collect::<ArrowResult<Vec<_>>>()
1385            .unwrap();
1386
1387        assert_eq!(async_batches, sync_batches);
1388    }
1389
1390    #[tokio::test]
1391    async fn test_fuzz_async_reader_selection() {
1392        let testdata = arrow::util::test_util::parquet_test_data();
1393        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1394        let data = Bytes::from(std::fs::read(path).unwrap());
1395
1396        let metadata = ParquetMetaDataReader::new()
1397            .parse_and_finish(&data)
1398            .unwrap();
1399        let metadata = Arc::new(metadata);
1400
1401        assert_eq!(metadata.num_row_groups(), 1);
1402
1403        let mut rand = thread_rng();
1404
1405        for _ in 0..100 {
1406            let mut expected_rows = 0;
1407            let mut total_rows = 0;
1408            let mut skip = false;
1409            let mut selectors = vec![];
1410
1411            while total_rows < 7300 {
1412                let row_count: usize = rand.gen_range(1..100);
1413
1414                let row_count = row_count.min(7300 - total_rows);
1415
1416                selectors.push(RowSelector { row_count, skip });
1417
1418                total_rows += row_count;
1419                if !skip {
1420                    expected_rows += row_count;
1421                }
1422
1423                skip = !skip;
1424            }
1425
1426            let selection = RowSelection::from(selectors);
1427
1428            let async_reader = TestReader {
1429                data: data.clone(),
1430                metadata: metadata.clone(),
1431                requests: Default::default(),
1432            };
1433
1434            let options = ArrowReaderOptions::new().with_page_index(true);
1435            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1436                .await
1437                .unwrap();
1438
1439            let col_idx: usize = rand.gen_range(0..13);
1440            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1441
1442            let stream = builder
1443                .with_projection(mask.clone())
1444                .with_row_selection(selection.clone())
1445                .build()
1446                .expect("building stream");
1447
1448            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1449
1450            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1451
1452            assert_eq!(actual_rows, expected_rows);
1453        }
1454    }
1455
1456    #[tokio::test]
1457    async fn test_async_reader_zero_row_selector() {
1458        //See https://github.com/apache/arrow-rs/issues/2669
1459        let testdata = arrow::util::test_util::parquet_test_data();
1460        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1461        let data = Bytes::from(std::fs::read(path).unwrap());
1462
1463        let metadata = ParquetMetaDataReader::new()
1464            .parse_and_finish(&data)
1465            .unwrap();
1466        let metadata = Arc::new(metadata);
1467
1468        assert_eq!(metadata.num_row_groups(), 1);
1469
1470        let mut rand = thread_rng();
1471
1472        let mut expected_rows = 0;
1473        let mut total_rows = 0;
1474        let mut skip = false;
1475        let mut selectors = vec![];
1476
1477        selectors.push(RowSelector {
1478            row_count: 0,
1479            skip: false,
1480        });
1481
1482        while total_rows < 7300 {
1483            let row_count: usize = rand.gen_range(1..100);
1484
1485            let row_count = row_count.min(7300 - total_rows);
1486
1487            selectors.push(RowSelector { row_count, skip });
1488
1489            total_rows += row_count;
1490            if !skip {
1491                expected_rows += row_count;
1492            }
1493
1494            skip = !skip;
1495        }
1496
1497        let selection = RowSelection::from(selectors);
1498
1499        let async_reader = TestReader {
1500            data: data.clone(),
1501            metadata: metadata.clone(),
1502            requests: Default::default(),
1503        };
1504
1505        let options = ArrowReaderOptions::new().with_page_index(true);
1506        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1507            .await
1508            .unwrap();
1509
1510        let col_idx: usize = rand.gen_range(0..13);
1511        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1512
1513        let stream = builder
1514            .with_projection(mask.clone())
1515            .with_row_selection(selection.clone())
1516            .build()
1517            .expect("building stream");
1518
1519        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1520
1521        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1522
1523        assert_eq!(actual_rows, expected_rows);
1524    }
1525
1526    #[tokio::test]
1527    async fn test_row_filter() {
1528        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1529        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1530        let c = Int32Array::from_iter(0..6);
1531        let data = RecordBatch::try_from_iter([
1532            ("a", Arc::new(a) as ArrayRef),
1533            ("b", Arc::new(b) as ArrayRef),
1534            ("c", Arc::new(c) as ArrayRef),
1535        ])
1536        .unwrap();
1537
1538        let mut buf = Vec::with_capacity(1024);
1539        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1540        writer.write(&data).unwrap();
1541        writer.close().unwrap();
1542
1543        let data: Bytes = buf.into();
1544        let metadata = ParquetMetaDataReader::new()
1545            .parse_and_finish(&data)
1546            .unwrap();
1547        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1548
1549        let test = TestReader {
1550            data,
1551            metadata: Arc::new(metadata),
1552            requests: Default::default(),
1553        };
1554        let requests = test.requests.clone();
1555
1556        let a_scalar = StringArray::from_iter_values(["b"]);
1557        let a_filter = ArrowPredicateFn::new(
1558            ProjectionMask::leaves(&parquet_schema, vec![0]),
1559            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1560        );
1561
1562        let b_scalar = StringArray::from_iter_values(["4"]);
1563        let b_filter = ArrowPredicateFn::new(
1564            ProjectionMask::leaves(&parquet_schema, vec![1]),
1565            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1566        );
1567
1568        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1569
1570        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1571        let stream = ParquetRecordBatchStreamBuilder::new(test)
1572            .await
1573            .unwrap()
1574            .with_projection(mask.clone())
1575            .with_batch_size(1024)
1576            .with_row_filter(filter)
1577            .build()
1578            .unwrap();
1579
1580        let batches: Vec<_> = stream.try_collect().await.unwrap();
1581        assert_eq!(batches.len(), 1);
1582
1583        let batch = &batches[0];
1584        assert_eq!(batch.num_rows(), 1);
1585        assert_eq!(batch.num_columns(), 2);
1586
1587        let col = batch.column(0);
1588        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1589        assert_eq!(val, "b");
1590
1591        let col = batch.column(1);
1592        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1593        assert_eq!(val, 3);
1594
1595        // Should only have made 3 requests
1596        assert_eq!(requests.lock().unwrap().len(), 3);
1597    }
1598
1599    #[tokio::test]
1600    async fn test_limit_multiple_row_groups() {
1601        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1602        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1603        let c = Int32Array::from_iter(0..6);
1604        let data = RecordBatch::try_from_iter([
1605            ("a", Arc::new(a) as ArrayRef),
1606            ("b", Arc::new(b) as ArrayRef),
1607            ("c", Arc::new(c) as ArrayRef),
1608        ])
1609        .unwrap();
1610
1611        let mut buf = Vec::with_capacity(1024);
1612        let props = WriterProperties::builder()
1613            .set_max_row_group_size(3)
1614            .build();
1615        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1616        writer.write(&data).unwrap();
1617        writer.close().unwrap();
1618
1619        let data: Bytes = buf.into();
1620        let metadata = ParquetMetaDataReader::new()
1621            .parse_and_finish(&data)
1622            .unwrap();
1623
1624        assert_eq!(metadata.num_row_groups(), 2);
1625
1626        let test = TestReader {
1627            data,
1628            metadata: Arc::new(metadata),
1629            requests: Default::default(),
1630        };
1631
1632        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1633            .await
1634            .unwrap()
1635            .with_batch_size(1024)
1636            .with_limit(4)
1637            .build()
1638            .unwrap();
1639
1640        let batches: Vec<_> = stream.try_collect().await.unwrap();
1641        // Expect one batch for each row group
1642        assert_eq!(batches.len(), 2);
1643
1644        let batch = &batches[0];
1645        // First batch should contain all rows
1646        assert_eq!(batch.num_rows(), 3);
1647        assert_eq!(batch.num_columns(), 3);
1648        let col2 = batch.column(2).as_primitive::<Int32Type>();
1649        assert_eq!(col2.values(), &[0, 1, 2]);
1650
1651        let batch = &batches[1];
1652        // Second batch should trigger the limit and only have one row
1653        assert_eq!(batch.num_rows(), 1);
1654        assert_eq!(batch.num_columns(), 3);
1655        let col2 = batch.column(2).as_primitive::<Int32Type>();
1656        assert_eq!(col2.values(), &[3]);
1657
1658        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1659            .await
1660            .unwrap()
1661            .with_offset(2)
1662            .with_limit(3)
1663            .build()
1664            .unwrap();
1665
1666        let batches: Vec<_> = stream.try_collect().await.unwrap();
1667        // Expect one batch for each row group
1668        assert_eq!(batches.len(), 2);
1669
1670        let batch = &batches[0];
1671        // First batch should contain one row
1672        assert_eq!(batch.num_rows(), 1);
1673        assert_eq!(batch.num_columns(), 3);
1674        let col2 = batch.column(2).as_primitive::<Int32Type>();
1675        assert_eq!(col2.values(), &[2]);
1676
1677        let batch = &batches[1];
1678        // Second batch should contain two rows
1679        assert_eq!(batch.num_rows(), 2);
1680        assert_eq!(batch.num_columns(), 3);
1681        let col2 = batch.column(2).as_primitive::<Int32Type>();
1682        assert_eq!(col2.values(), &[3, 4]);
1683
1684        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1685            .await
1686            .unwrap()
1687            .with_offset(4)
1688            .with_limit(20)
1689            .build()
1690            .unwrap();
1691
1692        let batches: Vec<_> = stream.try_collect().await.unwrap();
1693        // Should skip first row group
1694        assert_eq!(batches.len(), 1);
1695
1696        let batch = &batches[0];
1697        // First batch should contain two rows
1698        assert_eq!(batch.num_rows(), 2);
1699        assert_eq!(batch.num_columns(), 3);
1700        let col2 = batch.column(2).as_primitive::<Int32Type>();
1701        assert_eq!(col2.values(), &[4, 5]);
1702    }
1703
1704    #[tokio::test]
1705    async fn test_row_filter_with_index() {
1706        let testdata = arrow::util::test_util::parquet_test_data();
1707        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1708        let data = Bytes::from(std::fs::read(path).unwrap());
1709
1710        let metadata = ParquetMetaDataReader::new()
1711            .parse_and_finish(&data)
1712            .unwrap();
1713        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1714        let metadata = Arc::new(metadata);
1715
1716        assert_eq!(metadata.num_row_groups(), 1);
1717
1718        let async_reader = TestReader {
1719            data: data.clone(),
1720            metadata: metadata.clone(),
1721            requests: Default::default(),
1722        };
1723
1724        let a_filter =
1725            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1726                Ok(batch.column(0).as_boolean().clone())
1727            });
1728
1729        let b_scalar = Int8Array::from(vec![2]);
1730        let b_filter = ArrowPredicateFn::new(
1731            ProjectionMask::leaves(&parquet_schema, vec![2]),
1732            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1733        );
1734
1735        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1736
1737        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1738
1739        let options = ArrowReaderOptions::new().with_page_index(true);
1740        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1741            .await
1742            .unwrap()
1743            .with_projection(mask.clone())
1744            .with_batch_size(1024)
1745            .with_row_filter(filter)
1746            .build()
1747            .unwrap();
1748
1749        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1750
1751        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1752
1753        assert_eq!(total_rows, 730);
1754    }
1755
1756    #[tokio::test]
1757    async fn test_in_memory_row_group_sparse() {
1758        let testdata = arrow::util::test_util::parquet_test_data();
1759        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1760        let data = Bytes::from(std::fs::read(path).unwrap());
1761
1762        let metadata = ParquetMetaDataReader::new()
1763            .with_page_indexes(true)
1764            .parse_and_finish(&data)
1765            .unwrap();
1766
1767        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1768
1769        let mut metadata_builder = metadata.into_builder();
1770        let mut row_groups = metadata_builder.take_row_groups();
1771        row_groups.truncate(1);
1772        let row_group_meta = row_groups.pop().unwrap();
1773
1774        let metadata = metadata_builder
1775            .add_row_group(row_group_meta)
1776            .set_column_index(None)
1777            .set_offset_index(Some(vec![offset_index.clone()]))
1778            .build();
1779
1780        let metadata = Arc::new(metadata);
1781
1782        let num_rows = metadata.row_group(0).num_rows();
1783
1784        assert_eq!(metadata.num_row_groups(), 1);
1785
1786        let async_reader = TestReader {
1787            data: data.clone(),
1788            metadata: metadata.clone(),
1789            requests: Default::default(),
1790        };
1791
1792        let requests = async_reader.requests.clone();
1793        let (_, fields) = parquet_to_arrow_schema_and_fields(
1794            metadata.file_metadata().schema_descr(),
1795            ProjectionMask::all(),
1796            None,
1797        )
1798        .unwrap();
1799
1800        let _schema_desc = metadata.file_metadata().schema_descr();
1801
1802        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1803
1804        let reader_factory = ReaderFactory {
1805            metadata,
1806            fields: fields.map(Arc::new),
1807            input: async_reader,
1808            filter: None,
1809            limit: None,
1810            offset: None,
1811        };
1812
1813        let mut skip = true;
1814        let mut pages = offset_index[0].page_locations.iter().peekable();
1815
1816        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1817        let mut selectors = vec![];
1818        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1819        while let Some(page) = pages.next() {
1820            let num_rows = if let Some(next_page) = pages.peek() {
1821                next_page.first_row_index - page.first_row_index
1822            } else {
1823                num_rows - page.first_row_index
1824            };
1825
1826            if skip {
1827                selectors.push(RowSelector::skip(num_rows as usize));
1828            } else {
1829                selectors.push(RowSelector::select(num_rows as usize));
1830                let start = page.offset as usize;
1831                let end = start + page.compressed_page_size as usize;
1832                expected_page_requests.push(start..end);
1833            }
1834            skip = !skip;
1835        }
1836
1837        let selection = RowSelection::from(selectors);
1838
1839        let (_factory, _reader) = reader_factory
1840            .read_row_group(0, Some(selection), projection.clone(), 48)
1841            .await
1842            .expect("reading row group");
1843
1844        let requests = requests.lock().unwrap();
1845
1846        assert_eq!(&requests[..], &expected_page_requests)
1847    }
1848
1849    #[tokio::test]
1850    async fn test_batch_size_overallocate() {
1851        let testdata = arrow::util::test_util::parquet_test_data();
1852        // `alltypes_plain.parquet` only have 8 rows
1853        let path = format!("{testdata}/alltypes_plain.parquet");
1854        let data = Bytes::from(std::fs::read(path).unwrap());
1855
1856        let metadata = ParquetMetaDataReader::new()
1857            .parse_and_finish(&data)
1858            .unwrap();
1859        let file_rows = metadata.file_metadata().num_rows() as usize;
1860        let metadata = Arc::new(metadata);
1861
1862        let async_reader = TestReader {
1863            data: data.clone(),
1864            metadata: metadata.clone(),
1865            requests: Default::default(),
1866        };
1867
1868        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1869            .await
1870            .unwrap();
1871
1872        let stream = builder
1873            .with_projection(ProjectionMask::all())
1874            .with_batch_size(1024)
1875            .build()
1876            .unwrap();
1877        assert_ne!(1024, file_rows);
1878        assert_eq!(stream.batch_size, file_rows);
1879    }
1880
1881    #[tokio::test]
1882    async fn test_get_row_group_column_bloom_filter_without_length() {
1883        let testdata = arrow::util::test_util::parquet_test_data();
1884        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1885        let data = Bytes::from(std::fs::read(path).unwrap());
1886        test_get_row_group_column_bloom_filter(data, false).await;
1887    }
1888
1889    #[tokio::test]
1890    async fn test_parquet_record_batch_stream_schema() {
1891        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1892            schema.flattened_fields().iter().map(|f| f.name()).collect()
1893        }
1894
1895        // ParquetRecordBatchReaderBuilder::schema differs from
1896        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1897        // schema contents (in terms of custom metadata attached to schema, and fields
1898        // returned). Test to ensure this remains consistent behaviour.
1899        //
1900        // Ensure same for asynchronous versions of the above.
1901
1902        // Prep data, for a schema with nested fields, with custom metadata
1903        let mut metadata = HashMap::with_capacity(1);
1904        metadata.insert("key".to_string(), "value".to_string());
1905
1906        let nested_struct_array = StructArray::from(vec![
1907            (
1908                Arc::new(Field::new("d", DataType::Utf8, true)),
1909                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1910            ),
1911            (
1912                Arc::new(Field::new("e", DataType::Utf8, true)),
1913                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1914            ),
1915        ]);
1916        let struct_array = StructArray::from(vec![
1917            (
1918                Arc::new(Field::new("a", DataType::Int32, true)),
1919                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1920            ),
1921            (
1922                Arc::new(Field::new("b", DataType::UInt64, true)),
1923                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1924            ),
1925            (
1926                Arc::new(Field::new(
1927                    "c",
1928                    nested_struct_array.data_type().clone(),
1929                    true,
1930                )),
1931                Arc::new(nested_struct_array) as ArrayRef,
1932            ),
1933        ]);
1934
1935        let schema =
1936            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1937        let record_batch = RecordBatch::from(struct_array)
1938            .with_schema(schema.clone())
1939            .unwrap();
1940
1941        // Write parquet with custom metadata in schema
1942        let mut file = tempfile().unwrap();
1943        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1944        writer.write(&record_batch).unwrap();
1945        writer.close().unwrap();
1946
1947        let all_fields = ["a", "b", "c", "d", "e"];
1948        // (leaf indices in mask, expected names in output schema all fields)
1949        let projections = [
1950            (vec![], vec![]),
1951            (vec![0], vec!["a"]),
1952            (vec![0, 1], vec!["a", "b"]),
1953            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1954            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1955        ];
1956
1957        // Ensure we're consistent for each of these projections
1958        for (indices, expected_projected_names) in projections {
1959            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1960                // Builder schema should preserve all fields and metadata
1961                assert_eq!(get_all_field_names(&builder), all_fields);
1962                assert_eq!(builder.metadata, metadata);
1963                // Reader & batch schema should show only projected fields, and no metadata
1964                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1965                assert_eq!(reader.metadata, HashMap::default());
1966                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1967                assert_eq!(batch.metadata, HashMap::default());
1968            };
1969
1970            let builder =
1971                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1972            let sync_builder_schema = builder.schema().clone();
1973            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1974            let mut reader = builder.with_projection(mask).build().unwrap();
1975            let sync_reader_schema = reader.schema();
1976            let batch = reader.next().unwrap().unwrap();
1977            let sync_batch_schema = batch.schema();
1978            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1979
1980            // asynchronous should be same
1981            let file = tokio::fs::File::from(file.try_clone().unwrap());
1982            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1983            let async_builder_schema = builder.schema().clone();
1984            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1985            let mut reader = builder.with_projection(mask).build().unwrap();
1986            let async_reader_schema = reader.schema().clone();
1987            let batch = reader.next().await.unwrap().unwrap();
1988            let async_batch_schema = batch.schema();
1989            assert_schemas(
1990                async_builder_schema,
1991                async_reader_schema,
1992                async_batch_schema,
1993            );
1994        }
1995    }
1996
1997    #[tokio::test]
1998    async fn test_get_row_group_column_bloom_filter_with_length() {
1999        // convert to new parquet file with bloom_filter_length
2000        let testdata = arrow::util::test_util::parquet_test_data();
2001        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2002        let data = Bytes::from(std::fs::read(path).unwrap());
2003        let metadata = ParquetMetaDataReader::new()
2004            .parse_and_finish(&data)
2005            .unwrap();
2006        let metadata = Arc::new(metadata);
2007        let async_reader = TestReader {
2008            data: data.clone(),
2009            metadata: metadata.clone(),
2010            requests: Default::default(),
2011        };
2012        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2013            .await
2014            .unwrap();
2015        let schema = builder.schema().clone();
2016        let stream = builder.build().unwrap();
2017        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2018
2019        let mut parquet_data = Vec::new();
2020        let props = WriterProperties::builder()
2021            .set_bloom_filter_enabled(true)
2022            .build();
2023        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2024        for batch in batches {
2025            writer.write(&batch).unwrap();
2026        }
2027        writer.close().unwrap();
2028
2029        // test the new parquet file
2030        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2031    }
2032
2033    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2034        let metadata = ParquetMetaDataReader::new()
2035            .parse_and_finish(&data)
2036            .unwrap();
2037        let metadata = Arc::new(metadata);
2038
2039        assert_eq!(metadata.num_row_groups(), 1);
2040        let row_group = metadata.row_group(0);
2041        let column = row_group.column(0);
2042        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2043
2044        let async_reader = TestReader {
2045            data: data.clone(),
2046            metadata: metadata.clone(),
2047            requests: Default::default(),
2048        };
2049
2050        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2051            .await
2052            .unwrap();
2053
2054        let sbbf = builder
2055            .get_row_group_column_bloom_filter(0, 0)
2056            .await
2057            .unwrap()
2058            .unwrap();
2059        assert!(sbbf.check(&"Hello"));
2060        assert!(!sbbf.check(&"Hello_Not_Exists"));
2061    }
2062
2063    #[tokio::test]
2064    async fn test_nested_skip() {
2065        let schema = Arc::new(Schema::new(vec![
2066            Field::new("col_1", DataType::UInt64, false),
2067            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2068        ]));
2069
2070        // Default writer properties
2071        let props = WriterProperties::builder()
2072            .set_data_page_row_count_limit(256)
2073            .set_write_batch_size(256)
2074            .set_max_row_group_size(1024);
2075
2076        // Write data
2077        let mut file = tempfile().unwrap();
2078        let mut writer =
2079            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2080
2081        let mut builder = ListBuilder::new(StringBuilder::new());
2082        for id in 0..1024 {
2083            match id % 3 {
2084                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2085                1 => builder.append_value([Some(format!("id_{id}"))]),
2086                _ => builder.append_null(),
2087            }
2088        }
2089        let refs = vec![
2090            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2091            Arc::new(builder.finish()) as ArrayRef,
2092        ];
2093
2094        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2095        writer.write(&batch).unwrap();
2096        writer.close().unwrap();
2097
2098        let selections = [
2099            RowSelection::from(vec![
2100                RowSelector::skip(313),
2101                RowSelector::select(1),
2102                RowSelector::skip(709),
2103                RowSelector::select(1),
2104            ]),
2105            RowSelection::from(vec![
2106                RowSelector::skip(255),
2107                RowSelector::select(1),
2108                RowSelector::skip(767),
2109                RowSelector::select(1),
2110            ]),
2111            RowSelection::from(vec![
2112                RowSelector::select(255),
2113                RowSelector::skip(1),
2114                RowSelector::select(767),
2115                RowSelector::skip(1),
2116            ]),
2117            RowSelection::from(vec![
2118                RowSelector::skip(254),
2119                RowSelector::select(1),
2120                RowSelector::select(1),
2121                RowSelector::skip(767),
2122                RowSelector::select(1),
2123            ]),
2124        ];
2125
2126        for selection in selections {
2127            let expected = selection.row_count();
2128            // Read data
2129            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2130                tokio::fs::File::from_std(file.try_clone().unwrap()),
2131                ArrowReaderOptions::new().with_page_index(true),
2132            )
2133            .await
2134            .unwrap();
2135
2136            reader = reader.with_row_selection(selection);
2137
2138            let mut stream = reader.build().unwrap();
2139
2140            let mut total_rows = 0;
2141            while let Some(rb) = stream.next().await {
2142                let rb = rb.unwrap();
2143                total_rows += rb.num_rows();
2144            }
2145            assert_eq!(total_rows, expected);
2146        }
2147    }
2148
2149    #[tokio::test]
2150    async fn test_row_filter_nested() {
2151        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2152        let b = StructArray::from(vec![
2153            (
2154                Arc::new(Field::new("aa", DataType::Utf8, true)),
2155                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2156            ),
2157            (
2158                Arc::new(Field::new("bb", DataType::Utf8, true)),
2159                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2160            ),
2161        ]);
2162        let c = Int32Array::from_iter(0..6);
2163        let data = RecordBatch::try_from_iter([
2164            ("a", Arc::new(a) as ArrayRef),
2165            ("b", Arc::new(b) as ArrayRef),
2166            ("c", Arc::new(c) as ArrayRef),
2167        ])
2168        .unwrap();
2169
2170        let mut buf = Vec::with_capacity(1024);
2171        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2172        writer.write(&data).unwrap();
2173        writer.close().unwrap();
2174
2175        let data: Bytes = buf.into();
2176        let metadata = ParquetMetaDataReader::new()
2177            .parse_and_finish(&data)
2178            .unwrap();
2179        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2180
2181        let test = TestReader {
2182            data,
2183            metadata: Arc::new(metadata),
2184            requests: Default::default(),
2185        };
2186        let requests = test.requests.clone();
2187
2188        let a_scalar = StringArray::from_iter_values(["b"]);
2189        let a_filter = ArrowPredicateFn::new(
2190            ProjectionMask::leaves(&parquet_schema, vec![0]),
2191            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2192        );
2193
2194        let b_scalar = StringArray::from_iter_values(["4"]);
2195        let b_filter = ArrowPredicateFn::new(
2196            ProjectionMask::leaves(&parquet_schema, vec![2]),
2197            move |batch| {
2198                // Filter on the second element of the struct.
2199                let struct_array = batch
2200                    .column(0)
2201                    .as_any()
2202                    .downcast_ref::<StructArray>()
2203                    .unwrap();
2204                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2205            },
2206        );
2207
2208        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2209
2210        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2211        let stream = ParquetRecordBatchStreamBuilder::new(test)
2212            .await
2213            .unwrap()
2214            .with_projection(mask.clone())
2215            .with_batch_size(1024)
2216            .with_row_filter(filter)
2217            .build()
2218            .unwrap();
2219
2220        let batches: Vec<_> = stream.try_collect().await.unwrap();
2221        assert_eq!(batches.len(), 1);
2222
2223        let batch = &batches[0];
2224        assert_eq!(batch.num_rows(), 1);
2225        assert_eq!(batch.num_columns(), 2);
2226
2227        let col = batch.column(0);
2228        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2229        assert_eq!(val, "b");
2230
2231        let col = batch.column(1);
2232        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2233        assert_eq!(val, 3);
2234
2235        // Should only have made 3 requests
2236        assert_eq!(requests.lock().unwrap().len(), 3);
2237    }
2238
2239    #[tokio::test]
2240    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2241        use tokio::fs::File;
2242        let testdata = arrow::util::test_util::parquet_test_data();
2243        let path = format!("{testdata}/alltypes_plain.parquet");
2244        let mut file = File::open(&path).await.unwrap();
2245        let file_size = file.metadata().await.unwrap().len();
2246        let mut metadata = ParquetMetaDataReader::new()
2247            .with_page_indexes(true)
2248            .load_and_finish(&mut file, file_size as usize)
2249            .await
2250            .unwrap();
2251
2252        metadata.set_offset_index(Some(vec![]));
2253        let options = ArrowReaderOptions::new().with_page_index(true);
2254        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2255        let reader =
2256            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2257                .build()
2258                .unwrap();
2259
2260        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2261        assert_eq!(result.len(), 1);
2262    }
2263
2264    #[tokio::test]
2265    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2266        use tokio::fs::File;
2267        let testdata = arrow::util::test_util::parquet_test_data();
2268        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2269        let mut file = File::open(&path).await.unwrap();
2270        let file_size = file.metadata().await.unwrap().len();
2271        let metadata = ParquetMetaDataReader::new()
2272            .with_page_indexes(true)
2273            .load_and_finish(&mut file, file_size as usize)
2274            .await
2275            .unwrap();
2276
2277        let options = ArrowReaderOptions::new().with_page_index(true);
2278        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2279        let reader =
2280            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2281                .build()
2282                .unwrap();
2283
2284        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2285        assert_eq!(result.len(), 8);
2286    }
2287
2288    #[tokio::test]
2289    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2290        use tempfile::TempDir;
2291        use tokio::fs::File;
2292        fn write_metadata_to_local_file(
2293            metadata: ParquetMetaData,
2294            file: impl AsRef<std::path::Path>,
2295        ) {
2296            use crate::file::metadata::ParquetMetaDataWriter;
2297            use std::fs::File;
2298            let file = File::create(file).unwrap();
2299            ParquetMetaDataWriter::new(file, &metadata)
2300                .finish()
2301                .unwrap()
2302        }
2303
2304        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2305            use std::fs::File;
2306            let file = File::open(file).unwrap();
2307            ParquetMetaDataReader::new()
2308                .with_page_indexes(true)
2309                .parse_and_finish(&file)
2310                .unwrap()
2311        }
2312
2313        let testdata = arrow::util::test_util::parquet_test_data();
2314        let path = format!("{testdata}/alltypes_plain.parquet");
2315        let mut file = File::open(&path).await.unwrap();
2316        let file_size = file.metadata().await.unwrap().len();
2317        let metadata = ParquetMetaDataReader::new()
2318            .with_page_indexes(true)
2319            .load_and_finish(&mut file, file_size as usize)
2320            .await
2321            .unwrap();
2322
2323        let tempdir = TempDir::new().unwrap();
2324        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2325        write_metadata_to_local_file(metadata, &metadata_path);
2326        let metadata = read_metadata_from_local_file(&metadata_path);
2327
2328        let options = ArrowReaderOptions::new().with_page_index(true);
2329        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2330        let reader =
2331            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2332                .build()
2333                .unwrap();
2334
2335        // Panics here
2336        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2337        assert_eq!(result.len(), 1);
2338    }
2339}