parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
69///
70/// Notes:
71///
72/// 1. There is a default implementation for types that implement [`AsyncRead`]
73///    and [`AsyncSeek`], for example [`tokio::fs::File`].
74///
75/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
76///    is enabled, implements this interface for [`ObjectStore`].
77///
78/// [`ObjectStore`]: object_store::ObjectStore
79///
80/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
81pub trait AsyncFileReader: Send {
82    /// Retrieve the bytes in `range`
83    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
86    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87        async move {
88            let mut result = Vec::with_capacity(ranges.len());
89
90            for range in ranges.into_iter() {
91                let data = self.get_bytes(range).await?;
92                result.push(data);
93            }
94
95            Ok(result)
96        }
97        .boxed()
98    }
99
100    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
101    ///
102    /// This is an asynchronous operation as it may involve reading the file
103    /// footer and potentially other metadata from disk or a remote source.
104    ///
105    /// Reading data from Parquet requires the metadata to understand the
106    /// schema, row groups, and location of pages within the file. This metadata
107    /// is stored primarily in the footer of the Parquet file, and can be read using
108    /// [`ParquetMetaDataReader`].
109    ///
110    /// However, implementations can significantly speed up reading Parquet by
111    /// supplying cached metadata or pre-fetched metadata via this API.
112    ///
113    /// # Parameters
114    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
115    ///   and other options that affect how the metadata is read.
116    fn get_metadata<'a>(
117        &'a mut self,
118        options: Option<&'a ArrowReaderOptions>,
119    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
123impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125        self.as_mut().get_bytes(range)
126    }
127
128    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129        self.as_mut().get_byte_ranges(ranges)
130    }
131
132    fn get_metadata<'a>(
133        &'a mut self,
134        options: Option<&'a ArrowReaderOptions>,
135    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136        self.as_mut().get_metadata(options)
137    }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142        async move {
143            self.seek(SeekFrom::End(-(suffix as i64))).await?;
144            let mut buf = Vec::with_capacity(suffix);
145            self.take(suffix as _).read_to_end(&mut buf).await?;
146            Ok(buf.into())
147        }
148        .boxed()
149    }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154        async move {
155            self.seek(SeekFrom::Start(range.start)).await?;
156
157            let to_read = range.end - range.start;
158            let mut buffer = Vec::with_capacity(to_read.try_into()?);
159            let read = self.take(to_read).read_to_end(&mut buffer).await?;
160            if read as u64 != to_read {
161                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162            }
163
164            Ok(buffer.into())
165        }
166        .boxed()
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173        async move {
174            let metadata_reader = ParquetMetaDataReader::new()
175                .with_page_indexes(options.is_some_and(|o| o.page_index));
176
177            #[cfg(feature = "encryption")]
178            let metadata_reader = metadata_reader.with_decryption_properties(
179                options.and_then(|o| o.file_decryption_properties.as_ref()),
180            );
181
182            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183            Ok(Arc::new(parquet_metadata))
184        }
185        .boxed()
186    }
187}
188
189impl ArrowReaderMetadata {
190    /// Returns a new [`ArrowReaderMetadata`] for this builder
191    ///
192    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
193    pub async fn load_async<T: AsyncFileReader>(
194        input: &mut T,
195        options: ArrowReaderOptions,
196    ) -> Result<Self> {
197        let metadata = input.get_metadata(Some(&options)).await?;
198        Self::try_new(metadata, options)
199    }
200}
201
202#[doc(hidden)]
203/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
204///
205/// Allows sharing the same builder for both the sync and async versions, whilst also not
206/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
207pub struct AsyncReader<T>(T);
208
209/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
210///
211/// This can be used to decode a Parquet file in streaming fashion (without
212/// downloading the whole file at once) from a remote source, such as an object store.
213///
214/// This builder handles reading the parquet file metadata, allowing consumers
215/// to use this information to select what specific columns, row groups, etc.
216/// they wish to be read by the resulting stream.
217///
218/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
219///
220/// See [`ArrowReaderBuilder`] for additional member functions
221pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
225    /// specified source.
226    ///
227    /// # Example
228    /// ```
229    /// # #[tokio::main(flavor="current_thread")]
230    /// # async fn main() {
231    /// #
232    /// # use arrow_array::RecordBatch;
233    /// # use arrow::util::pretty::pretty_format_batches;
234    /// # use futures::TryStreamExt;
235    /// #
236    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
237    /// #
238    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
239    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
240    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
241    /// #     assert_eq!(
242    /// #          &actual_lines, expected_lines,
243    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
244    /// #          expected_lines, actual_lines
245    /// #      );
246    /// #  }
247    /// #
248    /// # let testdata = arrow::util::test_util::parquet_test_data();
249    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
250    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
251    /// // another async I/O reader such as a reader from an object store.
252    /// let file = tokio::fs::File::open(path).await.unwrap();
253    ///
254    /// // Configure options for reading from the async source
255    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
256    ///     .await
257    ///     .unwrap();
258    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
259    /// // a stream that can be used to incrementally read the data in batches
260    /// let stream = builder.build().unwrap();
261    /// // In this example, we collect the stream into a Vec<RecordBatch>
262    /// // but real applications would likely process the batches as they are read
263    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
264    /// // Demonstrate the results are as expected
265    /// assert_batches_eq(
266    ///     &results,
267    ///     &[
268    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
269    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
270    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
271    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
272    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
273    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
274    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
275    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
276    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
277    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
278    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
279    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
280    ///      ],
281    ///  );
282    /// # }
283    /// ```
284    ///
285    /// # Example configuring options and reading metadata
286    ///
287    /// There are many options that control the behavior of the reader, such as
288    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
289    ///
290    /// ```
291    /// # #[tokio::main(flavor="current_thread")]
292    /// # async fn main() {
293    /// #
294    /// # use arrow_array::RecordBatch;
295    /// # use arrow::util::pretty::pretty_format_batches;
296    /// # use futures::TryStreamExt;
297    /// #
298    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
299    /// #
300    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
301    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
302    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
303    /// #     assert_eq!(
304    /// #          &actual_lines, expected_lines,
305    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
306    /// #          expected_lines, actual_lines
307    /// #      );
308    /// #  }
309    /// #
310    /// # let testdata = arrow::util::test_util::parquet_test_data();
311    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
312    /// // As before, use tokio::fs::File to read data using an async I/O.
313    /// let file = tokio::fs::File::open(path).await.unwrap();
314    ///
315    /// // Configure options for reading from the async source, in this case we set the batch size
316    /// // to 3 which produces 3 rows at a time.
317    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
318    ///     .await
319    ///     .unwrap()
320    ///     .with_batch_size(3);
321    ///
322    /// // We can also read the metadata to inspect the schema and other metadata
323    /// // before actually reading the data
324    /// let file_metadata = builder.metadata().file_metadata();
325    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
326    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
327    ///
328    /// let stream = builder.with_projection(mask).build().unwrap();
329    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
330    /// // Print out the results
331    /// assert_batches_eq(
332    ///     &results,
333    ///     &[
334    ///         "+----------+-------------+-----------+",
335    ///         "| bool_col | tinyint_col | float_col |",
336    ///         "+----------+-------------+-----------+",
337    ///         "| true     | 0           | 0.0       |",
338    ///         "| false    | 1           | 1.1       |",
339    ///         "| true     | 0           | 0.0       |",
340    ///         "| false    | 1           | 1.1       |",
341    ///         "| true     | 0           | 0.0       |",
342    ///         "| false    | 1           | 1.1       |",
343    ///         "| true     | 0           | 0.0       |",
344    ///         "| false    | 1           | 1.1       |",
345    ///         "+----------+-------------+-----------+",
346    ///      ],
347    ///  );
348    ///
349    /// // The results has 8 rows, so since we set the batch size to 3, we expect
350    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
351    /// assert_eq!(results.len(), 3);
352    /// # }
353    /// ```
354    pub async fn new(input: T) -> Result<Self> {
355        Self::new_with_options(input, Default::default()).await
356    }
357
358    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
359    /// and [`ArrowReaderOptions`].
360    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362        Ok(Self::new_with_metadata(input, metadata))
363    }
364
365    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
366    ///
367    /// This allows loading metadata once and using it to create multiple builders with
368    /// potentially different settings, that can be read in parallel.
369    ///
370    /// # Example of reading from multiple streams in parallel
371    ///
372    /// ```
373    /// # use std::fs::metadata;
374    /// # use std::sync::Arc;
375    /// # use bytes::Bytes;
376    /// # use arrow_array::{Int32Array, RecordBatch};
377    /// # use arrow_schema::{DataType, Field, Schema};
378    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
379    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
380    /// # use tempfile::tempfile;
381    /// # use futures::StreamExt;
382    /// # #[tokio::main(flavor="current_thread")]
383    /// # async fn main() {
384    /// #
385    /// # let mut file = tempfile().unwrap();
386    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
387    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
388    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
389    /// # writer.write(&batch).unwrap();
390    /// # writer.close().unwrap();
391    /// // open file with parquet data
392    /// let mut file = tokio::fs::File::from_std(file);
393    /// // load metadata once
394    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
395    /// // create two readers, a and b, from the same underlying file
396    /// // without reading the metadata again
397    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
398    ///     file.try_clone().await.unwrap(),
399    ///     meta.clone()
400    /// ).build().unwrap();
401    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
402    ///
403    /// // Can read batches from both readers in parallel
404    /// assert_eq!(
405    ///   a.next().await.unwrap().unwrap(),
406    ///   b.next().await.unwrap().unwrap(),
407    /// );
408    /// # }
409    /// ```
410    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411        Self::new_builder(AsyncReader(input), metadata)
412    }
413
414    /// Read bloom filter for a column in a row group
415    ///
416    /// Returns `None` if the column does not have a bloom filter
417    ///
418    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
419    pub async fn get_row_group_column_bloom_filter(
420        &mut self,
421        row_group_idx: usize,
422        column_idx: usize,
423    ) -> Result<Option<Sbbf>> {
424        let metadata = self.metadata.row_group(row_group_idx);
425        let column_metadata = metadata.column(column_idx);
426
427        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428            offset
429                .try_into()
430                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431        } else {
432            return Ok(None);
433        };
434
435        let buffer = match column_metadata.bloom_filter_length() {
436            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437            None => self
438                .input
439                .0
440                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441        }
442        .await?;
443
444        let (header, bitset_offset) =
445            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447        match header.algorithm {
448            BloomFilterAlgorithm::BLOCK(_) => {
449                // this match exists to future proof the singleton algorithm enum
450            }
451        }
452        match header.compression {
453            BloomFilterCompression::UNCOMPRESSED(_) => {
454                // this match exists to future proof the singleton compression enum
455            }
456        }
457        match header.hash {
458            BloomFilterHash::XXHASH(_) => {
459                // this match exists to future proof the singleton hash enum
460            }
461        }
462
463        let bitset = match column_metadata.bloom_filter_length() {
464            Some(_) => buffer.slice(
465                (TryInto::<usize>::try_into(bitset_offset).unwrap()
466                    - TryInto::<usize>::try_into(offset).unwrap())..,
467            ),
468            None => {
469                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470                    ParquetError::General("Bloom filter length is invalid".to_string())
471                })?;
472                self.input
473                    .0
474                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
475                    .await?
476            }
477        };
478        Ok(Some(Sbbf::new(&bitset)))
479    }
480
481    /// Build a new [`ParquetRecordBatchStream`]
482    ///
483    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
484    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485        let num_row_groups = self.metadata.row_groups().len();
486
487        let row_groups = match self.row_groups {
488            Some(row_groups) => {
489                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
490                    return Err(general_err!(
491                        "row group {} out of bounds 0..{}",
492                        col,
493                        num_row_groups
494                    ));
495                }
496                row_groups.into()
497            }
498            None => (0..self.metadata.row_groups().len()).collect(),
499        };
500
501        // Try to avoid allocate large buffer
502        let batch_size = self
503            .batch_size
504            .min(self.metadata.file_metadata().num_rows() as usize);
505        let reader_factory = ReaderFactory {
506            input: self.input.0,
507            filter: self.filter,
508            metadata: self.metadata.clone(),
509            fields: self.fields,
510            limit: self.limit,
511            offset: self.offset,
512        };
513
514        // Ensure schema of ParquetRecordBatchStream respects projection, and does
515        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
516        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
517            Some(DataType::Struct(fields)) => {
518                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
519            }
520            None => Fields::empty(),
521            _ => unreachable!("Must be Struct for root type"),
522        };
523        let schema = Arc::new(Schema::new(projected_fields));
524
525        Ok(ParquetRecordBatchStream {
526            metadata: self.metadata,
527            batch_size,
528            row_groups,
529            projection: self.projection,
530            selection: self.selection,
531            schema,
532            reader_factory: Some(reader_factory),
533            state: StreamState::Init,
534        })
535    }
536}
537
538type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
539
540/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
541/// [`ParquetRecordBatchReader`]
542struct ReaderFactory<T> {
543    metadata: Arc<ParquetMetaData>,
544
545    fields: Option<Arc<ParquetField>>,
546
547    input: T,
548
549    filter: Option<RowFilter>,
550
551    limit: Option<usize>,
552
553    offset: Option<usize>,
554}
555
556impl<T> ReaderFactory<T>
557where
558    T: AsyncFileReader + Send,
559{
560    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
561    ///
562    /// Note: this captures self so that the resulting future has a static lifetime
563    async fn read_row_group(
564        mut self,
565        row_group_idx: usize,
566        mut selection: Option<RowSelection>,
567        projection: ProjectionMask,
568        batch_size: usize,
569    ) -> ReadResult<T> {
570        // TODO: calling build_array multiple times is wasteful
571
572        let meta = self.metadata.row_group(row_group_idx);
573        let offset_index = self
574            .metadata
575            .offset_index()
576            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
577            .filter(|index| !index.is_empty())
578            .map(|x| x[row_group_idx].as_slice());
579
580        let mut row_group = InMemoryRowGroup {
581            // schema: meta.schema_descr_ptr(),
582            row_count: meta.num_rows() as usize,
583            column_chunks: vec![None; meta.columns().len()],
584            offset_index,
585            row_group_idx,
586            metadata: self.metadata.as_ref(),
587        };
588
589        // Update selection based on any filters
590        if let Some(filter) = self.filter.as_mut() {
591            for predicate in filter.predicates.iter_mut() {
592                if !selects_any(selection.as_ref()) {
593                    return Ok((self, None));
594                }
595
596                let predicate_projection = predicate.projection();
597                row_group
598                    .fetch(&mut self.input, predicate_projection, selection.as_ref())
599                    .await?;
600
601                let array_reader =
602                    build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
603
604                selection = Some(evaluate_predicate(
605                    batch_size,
606                    array_reader,
607                    selection,
608                    predicate.as_mut(),
609                )?);
610            }
611        }
612
613        // Compute the number of rows in the selection before applying limit and offset
614        let rows_before = selection
615            .as_ref()
616            .map(|s| s.row_count())
617            .unwrap_or(row_group.row_count);
618
619        if rows_before == 0 {
620            return Ok((self, None));
621        }
622
623        selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
624
625        // Compute the number of rows in the selection after applying limit and offset
626        let rows_after = selection
627            .as_ref()
628            .map(|s| s.row_count())
629            .unwrap_or(row_group.row_count);
630
631        // Update offset if necessary
632        if let Some(offset) = &mut self.offset {
633            // Reduction is either because of offset or limit, as limit is applied
634            // after offset has been "exhausted" can just use saturating sub here
635            *offset = offset.saturating_sub(rows_before - rows_after)
636        }
637
638        if rows_after == 0 {
639            return Ok((self, None));
640        }
641
642        if let Some(limit) = &mut self.limit {
643            *limit -= rows_after;
644        }
645
646        row_group
647            .fetch(&mut self.input, &projection, selection.as_ref())
648            .await?;
649
650        let reader = ParquetRecordBatchReader::new(
651            batch_size,
652            build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
653            selection,
654        );
655
656        Ok((self, Some(reader)))
657    }
658}
659
660enum StreamState<T> {
661    /// At the start of a new row group, or the end of the parquet stream
662    Init,
663    /// Decoding a batch
664    Decoding(ParquetRecordBatchReader),
665    /// Reading data from input
666    Reading(BoxFuture<'static, ReadResult<T>>),
667    /// Error
668    Error,
669}
670
671impl<T> std::fmt::Debug for StreamState<T> {
672    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
673        match self {
674            StreamState::Init => write!(f, "StreamState::Init"),
675            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
676            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
677            StreamState::Error => write!(f, "StreamState::Error"),
678        }
679    }
680}
681
682/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
683///
684/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
685/// allowing users to decode record batches separately from I/O.
686///
687/// # I/O Buffering
688///
689/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
690/// (projection + filtering, etc) and decodes the rows from those buffered pages.
691///
692/// For example, if all rows and columns are selected, the entire row group is
693/// buffered in memory during decode. This minimizes the number of IO operations
694/// required, which is especially important for object stores, where IO operations
695/// have latencies in the hundreds of milliseconds
696///
697///
698/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
699pub struct ParquetRecordBatchStream<T> {
700    metadata: Arc<ParquetMetaData>,
701
702    schema: SchemaRef,
703
704    row_groups: VecDeque<usize>,
705
706    projection: ProjectionMask,
707
708    batch_size: usize,
709
710    selection: Option<RowSelection>,
711
712    /// This is an option so it can be moved into a future
713    reader_factory: Option<ReaderFactory<T>>,
714
715    state: StreamState<T>,
716}
717
718impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
719    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
720        f.debug_struct("ParquetRecordBatchStream")
721            .field("metadata", &self.metadata)
722            .field("schema", &self.schema)
723            .field("batch_size", &self.batch_size)
724            .field("projection", &self.projection)
725            .field("state", &self.state)
726            .finish()
727    }
728}
729
730impl<T> ParquetRecordBatchStream<T> {
731    /// Returns the projected [`SchemaRef`] for reading the parquet file.
732    ///
733    /// Note that the schema metadata will be stripped here. See
734    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
735    pub fn schema(&self) -> &SchemaRef {
736        &self.schema
737    }
738}
739
740impl<T> ParquetRecordBatchStream<T>
741where
742    T: AsyncFileReader + Unpin + Send + 'static,
743{
744    /// Fetches the next row group from the stream.
745    ///
746    /// Users can continue to call this function to get row groups and decode them concurrently.
747    ///
748    /// ## Notes
749    ///
750    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
751    ///
752    /// ## Returns
753    ///
754    /// - `Ok(None)` if the stream has ended.
755    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
756    /// - `Ok(Some(reader))` which holds all the data for the row group.
757    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
758        loop {
759            match &mut self.state {
760                StreamState::Decoding(_) | StreamState::Reading(_) => {
761                    return Err(ParquetError::General(
762                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
763                    ))
764                }
765                StreamState::Init => {
766                    let row_group_idx = match self.row_groups.pop_front() {
767                        Some(idx) => idx,
768                        None => return Ok(None),
769                    };
770
771                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
772
773                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
774
775                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
776
777                    let (reader_factory, maybe_reader) = reader_factory
778                        .read_row_group(
779                            row_group_idx,
780                            selection,
781                            self.projection.clone(),
782                            self.batch_size,
783                        )
784                        .await
785                        .inspect_err(|_| {
786                            self.state = StreamState::Error;
787                        })?;
788                    self.reader_factory = Some(reader_factory);
789
790                    if let Some(reader) = maybe_reader {
791                        return Ok(Some(reader));
792                    } else {
793                        // All rows skipped, read next row group
794                        continue;
795                    }
796                }
797                StreamState::Error => return Ok(None), // Ends the stream as error happens.
798            }
799        }
800    }
801}
802
803impl<T> Stream for ParquetRecordBatchStream<T>
804where
805    T: AsyncFileReader + Unpin + Send + 'static,
806{
807    type Item = Result<RecordBatch>;
808
809    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
810        loop {
811            match &mut self.state {
812                StreamState::Decoding(batch_reader) => match batch_reader.next() {
813                    Some(Ok(batch)) => {
814                        return Poll::Ready(Some(Ok(batch)));
815                    }
816                    Some(Err(e)) => {
817                        self.state = StreamState::Error;
818                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
819                    }
820                    None => self.state = StreamState::Init,
821                },
822                StreamState::Init => {
823                    let row_group_idx = match self.row_groups.pop_front() {
824                        Some(idx) => idx,
825                        None => return Poll::Ready(None),
826                    };
827
828                    let reader = self.reader_factory.take().expect("lost reader factory");
829
830                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
831
832                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
833
834                    let fut = reader
835                        .read_row_group(
836                            row_group_idx,
837                            selection,
838                            self.projection.clone(),
839                            self.batch_size,
840                        )
841                        .boxed();
842
843                    self.state = StreamState::Reading(fut)
844                }
845                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
846                    Ok((reader_factory, maybe_reader)) => {
847                        self.reader_factory = Some(reader_factory);
848                        match maybe_reader {
849                            // Read records from [`ParquetRecordBatchReader`]
850                            Some(reader) => self.state = StreamState::Decoding(reader),
851                            // All rows skipped, read next row group
852                            None => self.state = StreamState::Init,
853                        }
854                    }
855                    Err(e) => {
856                        self.state = StreamState::Error;
857                        return Poll::Ready(Some(Err(e)));
858                    }
859                },
860                StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
861            }
862        }
863    }
864}
865
866/// An in-memory collection of column chunks
867struct InMemoryRowGroup<'a> {
868    offset_index: Option<&'a [OffsetIndexMetaData]>,
869    /// Column chunks for this row group
870    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
871    row_count: usize,
872    row_group_idx: usize,
873    metadata: &'a ParquetMetaData,
874}
875
876impl InMemoryRowGroup<'_> {
877    /// Fetches any additional column data specified in `projection` that is not already
878    /// present in `self.column_chunks`.
879    ///
880    /// If `selection` is provided, only the pages required for the selection
881    /// are fetched. Otherwise, all pages are fetched.
882    async fn fetch<T: AsyncFileReader + Send>(
883        &mut self,
884        input: &mut T,
885        projection: &ProjectionMask,
886        selection: Option<&RowSelection>,
887    ) -> Result<()> {
888        let metadata = self.metadata.row_group(self.row_group_idx);
889        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
890            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
891            // `RowSelection`
892            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
893
894            let fetch_ranges = self
895                .column_chunks
896                .iter()
897                .zip(metadata.columns())
898                .enumerate()
899                .filter(|&(idx, (chunk, _chunk_meta))| {
900                    chunk.is_none() && projection.leaf_included(idx)
901                })
902                .flat_map(|(idx, (_chunk, chunk_meta))| {
903                    // If the first page does not start at the beginning of the column,
904                    // then we need to also fetch a dictionary page.
905                    let mut ranges: Vec<Range<u64>> = vec![];
906                    let (start, _len) = chunk_meta.byte_range();
907                    match offset_index[idx].page_locations.first() {
908                        Some(first) if first.offset as u64 != start => {
909                            ranges.push(start..first.offset as u64);
910                        }
911                        _ => (),
912                    }
913
914                    ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
915                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
916
917                    ranges
918                })
919                .collect();
920
921            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
922            let mut page_start_offsets = page_start_offsets.into_iter();
923
924            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
925                if chunk.is_some() || !projection.leaf_included(idx) {
926                    continue;
927                }
928
929                if let Some(offsets) = page_start_offsets.next() {
930                    let mut chunks = Vec::with_capacity(offsets.len());
931                    for _ in 0..offsets.len() {
932                        chunks.push(chunk_data.next().unwrap());
933                    }
934
935                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
936                        length: metadata.column(idx).byte_range().1 as usize,
937                        data: offsets
938                            .into_iter()
939                            .map(|x| x as usize)
940                            .zip(chunks.into_iter())
941                            .collect(),
942                    }))
943                }
944            }
945        } else {
946            let fetch_ranges = self
947                .column_chunks
948                .iter()
949                .enumerate()
950                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
951                .map(|(idx, _chunk)| {
952                    let column = metadata.column(idx);
953                    let (start, length) = column.byte_range();
954                    start..(start + length)
955                })
956                .collect();
957
958            let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
959
960            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
961                if chunk.is_some() || !projection.leaf_included(idx) {
962                    continue;
963                }
964
965                if let Some(data) = chunk_data.next() {
966                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
967                        offset: metadata.column(idx).byte_range().0 as usize,
968                        data,
969                    }));
970                }
971            }
972        }
973
974        Ok(())
975    }
976}
977
978impl RowGroups for InMemoryRowGroup<'_> {
979    fn num_rows(&self) -> usize {
980        self.row_count
981    }
982
983    /// Return chunks for column i
984    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
985        match &self.column_chunks[i] {
986            None => Err(ParquetError::General(format!(
987                "Invalid column index {i}, column was not fetched"
988            ))),
989            Some(data) => {
990                let page_locations = self
991                    .offset_index
992                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
993                    .filter(|index| !index.is_empty())
994                    .map(|index| index[i].page_locations.clone());
995                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
996                let page_reader = SerializedPageReader::new(
997                    data.clone(),
998                    column_chunk_metadata,
999                    self.row_count,
1000                    page_locations,
1001                )?;
1002                let page_reader = page_reader.add_crypto_context(
1003                    self.row_group_idx,
1004                    i,
1005                    self.metadata,
1006                    column_chunk_metadata,
1007                )?;
1008
1009                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1010
1011                Ok(Box::new(ColumnChunkIterator {
1012                    reader: Some(Ok(page_reader)),
1013                }))
1014            }
1015        }
1016    }
1017}
1018
1019/// An in-memory column chunk
1020#[derive(Clone)]
1021enum ColumnChunkData {
1022    /// Column chunk data representing only a subset of data pages
1023    Sparse {
1024        /// Length of the full column chunk
1025        length: usize,
1026        /// Subset of data pages included in this sparse chunk.
1027        ///
1028        /// Each element is a tuple of (page offset within file, page data).
1029        /// Each entry is a complete page and the list is ordered by offset.
1030        data: Vec<(usize, Bytes)>,
1031    },
1032    /// Full column chunk and the offset within the original file
1033    Dense { offset: usize, data: Bytes },
1034}
1035
1036impl ColumnChunkData {
1037    /// Return the data for this column chunk at the given offset
1038    fn get(&self, start: u64) -> Result<Bytes> {
1039        match &self {
1040            ColumnChunkData::Sparse { data, .. } => data
1041                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1042                .map(|idx| data[idx].1.clone())
1043                .map_err(|_| {
1044                    ParquetError::General(format!(
1045                        "Invalid offset in sparse column chunk data: {start}"
1046                    ))
1047                }),
1048            ColumnChunkData::Dense { offset, data } => {
1049                let start = start as usize - *offset;
1050                Ok(data.slice(start..))
1051            }
1052        }
1053    }
1054}
1055
1056impl Length for ColumnChunkData {
1057    /// Return the total length of the full column chunk
1058    fn len(&self) -> u64 {
1059        match &self {
1060            ColumnChunkData::Sparse { length, .. } => *length as u64,
1061            ColumnChunkData::Dense { data, .. } => data.len() as u64,
1062        }
1063    }
1064}
1065
1066impl ChunkReader for ColumnChunkData {
1067    type T = bytes::buf::Reader<Bytes>;
1068
1069    fn get_read(&self, start: u64) -> Result<Self::T> {
1070        Ok(self.get(start)?.reader())
1071    }
1072
1073    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1074        Ok(self.get(start)?.slice(..length))
1075    }
1076}
1077
1078/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1079struct ColumnChunkIterator {
1080    reader: Option<Result<Box<dyn PageReader>>>,
1081}
1082
1083impl Iterator for ColumnChunkIterator {
1084    type Item = Result<Box<dyn PageReader>>;
1085
1086    fn next(&mut self) -> Option<Self::Item> {
1087        self.reader.take()
1088    }
1089}
1090
1091impl PageIterator for ColumnChunkIterator {}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use crate::arrow::arrow_reader::{
1097        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1098    };
1099    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1100    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1101    use crate::arrow::ArrowWriter;
1102    use crate::file::metadata::ParquetMetaDataReader;
1103    use crate::file::properties::WriterProperties;
1104    use arrow::compute::kernels::cmp::eq;
1105    use arrow::error::Result as ArrowResult;
1106    use arrow_array::builder::{ListBuilder, StringBuilder};
1107    use arrow_array::cast::AsArray;
1108    use arrow_array::types::Int32Type;
1109    use arrow_array::{
1110        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1111        StructArray, UInt64Array,
1112    };
1113    use arrow_schema::{DataType, Field, Schema};
1114    use futures::{StreamExt, TryStreamExt};
1115    use rand::{rng, Rng};
1116    use std::collections::HashMap;
1117    use std::sync::{Arc, Mutex};
1118    use tempfile::tempfile;
1119
1120    #[derive(Clone)]
1121    struct TestReader {
1122        data: Bytes,
1123        metadata: Option<Arc<ParquetMetaData>>,
1124        requests: Arc<Mutex<Vec<Range<usize>>>>,
1125    }
1126
1127    impl AsyncFileReader for TestReader {
1128        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1129            let range = range.clone();
1130            self.requests
1131                .lock()
1132                .unwrap()
1133                .push(range.start as usize..range.end as usize);
1134            futures::future::ready(Ok(self
1135                .data
1136                .slice(range.start as usize..range.end as usize)))
1137            .boxed()
1138        }
1139
1140        fn get_metadata<'a>(
1141            &'a mut self,
1142            options: Option<&'a ArrowReaderOptions>,
1143        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1144            let metadata_reader = ParquetMetaDataReader::new()
1145                .with_page_indexes(options.is_some_and(|o| o.page_index));
1146            self.metadata = Some(Arc::new(
1147                metadata_reader.parse_and_finish(&self.data).unwrap(),
1148            ));
1149            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1150        }
1151    }
1152
1153    #[tokio::test]
1154    async fn test_async_reader() {
1155        let testdata = arrow::util::test_util::parquet_test_data();
1156        let path = format!("{testdata}/alltypes_plain.parquet");
1157        let data = Bytes::from(std::fs::read(path).unwrap());
1158
1159        let async_reader = TestReader {
1160            data: data.clone(),
1161            metadata: Default::default(),
1162            requests: Default::default(),
1163        };
1164
1165        let requests = async_reader.requests.clone();
1166        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1167            .await
1168            .unwrap();
1169
1170        let metadata = builder.metadata().clone();
1171        assert_eq!(metadata.num_row_groups(), 1);
1172
1173        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1174        let stream = builder
1175            .with_projection(mask.clone())
1176            .with_batch_size(1024)
1177            .build()
1178            .unwrap();
1179
1180        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183            .unwrap()
1184            .with_projection(mask)
1185            .with_batch_size(104)
1186            .build()
1187            .unwrap()
1188            .collect::<ArrowResult<Vec<_>>>()
1189            .unwrap();
1190
1191        assert_eq!(async_batches, sync_batches);
1192
1193        let requests = requests.lock().unwrap();
1194        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1195        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1196
1197        assert_eq!(
1198            &requests[..],
1199            &[
1200                offset_1 as usize..(offset_1 + length_1) as usize,
1201                offset_2 as usize..(offset_2 + length_2) as usize
1202            ]
1203        );
1204    }
1205
1206    #[tokio::test]
1207    async fn test_async_reader_with_next_row_group() {
1208        let testdata = arrow::util::test_util::parquet_test_data();
1209        let path = format!("{testdata}/alltypes_plain.parquet");
1210        let data = Bytes::from(std::fs::read(path).unwrap());
1211
1212        let async_reader = TestReader {
1213            data: data.clone(),
1214            metadata: Default::default(),
1215            requests: Default::default(),
1216        };
1217
1218        let requests = async_reader.requests.clone();
1219        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1220            .await
1221            .unwrap();
1222
1223        let metadata = builder.metadata().clone();
1224        assert_eq!(metadata.num_row_groups(), 1);
1225
1226        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1227        let mut stream = builder
1228            .with_projection(mask.clone())
1229            .with_batch_size(1024)
1230            .build()
1231            .unwrap();
1232
1233        let mut readers = vec![];
1234        while let Some(reader) = stream.next_row_group().await.unwrap() {
1235            readers.push(reader);
1236        }
1237
1238        let async_batches: Vec<_> = readers
1239            .into_iter()
1240            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1241            .collect();
1242
1243        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1244            .unwrap()
1245            .with_projection(mask)
1246            .with_batch_size(104)
1247            .build()
1248            .unwrap()
1249            .collect::<ArrowResult<Vec<_>>>()
1250            .unwrap();
1251
1252        assert_eq!(async_batches, sync_batches);
1253
1254        let requests = requests.lock().unwrap();
1255        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1256        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1257
1258        assert_eq!(
1259            &requests[..],
1260            &[
1261                offset_1 as usize..(offset_1 + length_1) as usize,
1262                offset_2 as usize..(offset_2 + length_2) as usize
1263            ]
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn test_async_reader_with_index() {
1269        let testdata = arrow::util::test_util::parquet_test_data();
1270        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1271        let data = Bytes::from(std::fs::read(path).unwrap());
1272
1273        let async_reader = TestReader {
1274            data: data.clone(),
1275            metadata: Default::default(),
1276            requests: Default::default(),
1277        };
1278
1279        let options = ArrowReaderOptions::new().with_page_index(true);
1280        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1281            .await
1282            .unwrap();
1283
1284        // The builder should have page and offset indexes loaded now
1285        let metadata_with_index = builder.metadata();
1286        assert_eq!(metadata_with_index.num_row_groups(), 1);
1287
1288        // Check offset indexes are present for all columns
1289        let offset_index = metadata_with_index.offset_index().unwrap();
1290        let column_index = metadata_with_index.column_index().unwrap();
1291
1292        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1293        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1294
1295        let num_columns = metadata_with_index
1296            .file_metadata()
1297            .schema_descr()
1298            .num_columns();
1299
1300        // Check page indexes are present for all columns
1301        offset_index
1302            .iter()
1303            .for_each(|x| assert_eq!(x.len(), num_columns));
1304        column_index
1305            .iter()
1306            .for_each(|x| assert_eq!(x.len(), num_columns));
1307
1308        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1309        let stream = builder
1310            .with_projection(mask.clone())
1311            .with_batch_size(1024)
1312            .build()
1313            .unwrap();
1314
1315        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1316
1317        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1318            .unwrap()
1319            .with_projection(mask)
1320            .with_batch_size(1024)
1321            .build()
1322            .unwrap()
1323            .collect::<ArrowResult<Vec<_>>>()
1324            .unwrap();
1325
1326        assert_eq!(async_batches, sync_batches);
1327    }
1328
1329    #[tokio::test]
1330    async fn test_async_reader_with_limit() {
1331        let testdata = arrow::util::test_util::parquet_test_data();
1332        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1333        let data = Bytes::from(std::fs::read(path).unwrap());
1334
1335        let metadata = ParquetMetaDataReader::new()
1336            .parse_and_finish(&data)
1337            .unwrap();
1338        let metadata = Arc::new(metadata);
1339
1340        assert_eq!(metadata.num_row_groups(), 1);
1341
1342        let async_reader = TestReader {
1343            data: data.clone(),
1344            metadata: Default::default(),
1345            requests: Default::default(),
1346        };
1347
1348        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1349            .await
1350            .unwrap();
1351
1352        assert_eq!(builder.metadata().num_row_groups(), 1);
1353
1354        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1355        let stream = builder
1356            .with_projection(mask.clone())
1357            .with_batch_size(1024)
1358            .with_limit(1)
1359            .build()
1360            .unwrap();
1361
1362        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1363
1364        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1365            .unwrap()
1366            .with_projection(mask)
1367            .with_batch_size(1024)
1368            .with_limit(1)
1369            .build()
1370            .unwrap()
1371            .collect::<ArrowResult<Vec<_>>>()
1372            .unwrap();
1373
1374        assert_eq!(async_batches, sync_batches);
1375    }
1376
1377    #[tokio::test]
1378    async fn test_async_reader_skip_pages() {
1379        let testdata = arrow::util::test_util::parquet_test_data();
1380        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1381        let data = Bytes::from(std::fs::read(path).unwrap());
1382
1383        let async_reader = TestReader {
1384            data: data.clone(),
1385            metadata: Default::default(),
1386            requests: Default::default(),
1387        };
1388
1389        let options = ArrowReaderOptions::new().with_page_index(true);
1390        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1391            .await
1392            .unwrap();
1393
1394        assert_eq!(builder.metadata().num_row_groups(), 1);
1395
1396        let selection = RowSelection::from(vec![
1397            RowSelector::skip(21),   // Skip first page
1398            RowSelector::select(21), // Select page to boundary
1399            RowSelector::skip(41),   // Skip multiple pages
1400            RowSelector::select(41), // Select multiple pages
1401            RowSelector::skip(25),   // Skip page across boundary
1402            RowSelector::select(25), // Select across page boundary
1403            RowSelector::skip(7116), // Skip to final page boundary
1404            RowSelector::select(10), // Select final page
1405        ]);
1406
1407        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1408
1409        let stream = builder
1410            .with_projection(mask.clone())
1411            .with_row_selection(selection.clone())
1412            .build()
1413            .expect("building stream");
1414
1415        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1416
1417        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1418            .unwrap()
1419            .with_projection(mask)
1420            .with_batch_size(1024)
1421            .with_row_selection(selection)
1422            .build()
1423            .unwrap()
1424            .collect::<ArrowResult<Vec<_>>>()
1425            .unwrap();
1426
1427        assert_eq!(async_batches, sync_batches);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_fuzz_async_reader_selection() {
1432        let testdata = arrow::util::test_util::parquet_test_data();
1433        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1434        let data = Bytes::from(std::fs::read(path).unwrap());
1435
1436        let mut rand = rng();
1437
1438        for _ in 0..100 {
1439            let mut expected_rows = 0;
1440            let mut total_rows = 0;
1441            let mut skip = false;
1442            let mut selectors = vec![];
1443
1444            while total_rows < 7300 {
1445                let row_count: usize = rand.random_range(1..100);
1446
1447                let row_count = row_count.min(7300 - total_rows);
1448
1449                selectors.push(RowSelector { row_count, skip });
1450
1451                total_rows += row_count;
1452                if !skip {
1453                    expected_rows += row_count;
1454                }
1455
1456                skip = !skip;
1457            }
1458
1459            let selection = RowSelection::from(selectors);
1460
1461            let async_reader = TestReader {
1462                data: data.clone(),
1463                metadata: Default::default(),
1464                requests: Default::default(),
1465            };
1466
1467            let options = ArrowReaderOptions::new().with_page_index(true);
1468            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1469                .await
1470                .unwrap();
1471
1472            assert_eq!(builder.metadata().num_row_groups(), 1);
1473
1474            let col_idx: usize = rand.random_range(0..13);
1475            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1476
1477            let stream = builder
1478                .with_projection(mask.clone())
1479                .with_row_selection(selection.clone())
1480                .build()
1481                .expect("building stream");
1482
1483            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1484
1485            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1486
1487            assert_eq!(actual_rows, expected_rows);
1488        }
1489    }
1490
1491    #[tokio::test]
1492    async fn test_async_reader_zero_row_selector() {
1493        //See https://github.com/apache/arrow-rs/issues/2669
1494        let testdata = arrow::util::test_util::parquet_test_data();
1495        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1496        let data = Bytes::from(std::fs::read(path).unwrap());
1497
1498        let mut rand = rng();
1499
1500        let mut expected_rows = 0;
1501        let mut total_rows = 0;
1502        let mut skip = false;
1503        let mut selectors = vec![];
1504
1505        selectors.push(RowSelector {
1506            row_count: 0,
1507            skip: false,
1508        });
1509
1510        while total_rows < 7300 {
1511            let row_count: usize = rand.random_range(1..100);
1512
1513            let row_count = row_count.min(7300 - total_rows);
1514
1515            selectors.push(RowSelector { row_count, skip });
1516
1517            total_rows += row_count;
1518            if !skip {
1519                expected_rows += row_count;
1520            }
1521
1522            skip = !skip;
1523        }
1524
1525        let selection = RowSelection::from(selectors);
1526
1527        let async_reader = TestReader {
1528            data: data.clone(),
1529            metadata: Default::default(),
1530            requests: Default::default(),
1531        };
1532
1533        let options = ArrowReaderOptions::new().with_page_index(true);
1534        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1535            .await
1536            .unwrap();
1537
1538        assert_eq!(builder.metadata().num_row_groups(), 1);
1539
1540        let col_idx: usize = rand.random_range(0..13);
1541        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1542
1543        let stream = builder
1544            .with_projection(mask.clone())
1545            .with_row_selection(selection.clone())
1546            .build()
1547            .expect("building stream");
1548
1549        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1550
1551        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1552
1553        assert_eq!(actual_rows, expected_rows);
1554    }
1555
1556    #[tokio::test]
1557    async fn test_row_filter() {
1558        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1559        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1560        let c = Int32Array::from_iter(0..6);
1561        let data = RecordBatch::try_from_iter([
1562            ("a", Arc::new(a) as ArrayRef),
1563            ("b", Arc::new(b) as ArrayRef),
1564            ("c", Arc::new(c) as ArrayRef),
1565        ])
1566        .unwrap();
1567
1568        let mut buf = Vec::with_capacity(1024);
1569        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1570        writer.write(&data).unwrap();
1571        writer.close().unwrap();
1572
1573        let data: Bytes = buf.into();
1574        let metadata = ParquetMetaDataReader::new()
1575            .parse_and_finish(&data)
1576            .unwrap();
1577        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1578
1579        let test = TestReader {
1580            data,
1581            metadata: Default::default(),
1582            requests: Default::default(),
1583        };
1584        let requests = test.requests.clone();
1585
1586        let a_scalar = StringArray::from_iter_values(["b"]);
1587        let a_filter = ArrowPredicateFn::new(
1588            ProjectionMask::leaves(&parquet_schema, vec![0]),
1589            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1590        );
1591
1592        let b_scalar = StringArray::from_iter_values(["4"]);
1593        let b_filter = ArrowPredicateFn::new(
1594            ProjectionMask::leaves(&parquet_schema, vec![1]),
1595            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1596        );
1597
1598        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1599
1600        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1601        let stream = ParquetRecordBatchStreamBuilder::new(test)
1602            .await
1603            .unwrap()
1604            .with_projection(mask.clone())
1605            .with_batch_size(1024)
1606            .with_row_filter(filter)
1607            .build()
1608            .unwrap();
1609
1610        let batches: Vec<_> = stream.try_collect().await.unwrap();
1611        assert_eq!(batches.len(), 1);
1612
1613        let batch = &batches[0];
1614        assert_eq!(batch.num_rows(), 1);
1615        assert_eq!(batch.num_columns(), 2);
1616
1617        let col = batch.column(0);
1618        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1619        assert_eq!(val, "b");
1620
1621        let col = batch.column(1);
1622        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1623        assert_eq!(val, 3);
1624
1625        // Should only have made 3 requests
1626        assert_eq!(requests.lock().unwrap().len(), 3);
1627    }
1628
1629    #[tokio::test]
1630    async fn test_limit_multiple_row_groups() {
1631        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1632        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1633        let c = Int32Array::from_iter(0..6);
1634        let data = RecordBatch::try_from_iter([
1635            ("a", Arc::new(a) as ArrayRef),
1636            ("b", Arc::new(b) as ArrayRef),
1637            ("c", Arc::new(c) as ArrayRef),
1638        ])
1639        .unwrap();
1640
1641        let mut buf = Vec::with_capacity(1024);
1642        let props = WriterProperties::builder()
1643            .set_max_row_group_size(3)
1644            .build();
1645        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1646        writer.write(&data).unwrap();
1647        writer.close().unwrap();
1648
1649        let data: Bytes = buf.into();
1650        let metadata = ParquetMetaDataReader::new()
1651            .parse_and_finish(&data)
1652            .unwrap();
1653
1654        assert_eq!(metadata.num_row_groups(), 2);
1655
1656        let test = TestReader {
1657            data,
1658            metadata: Default::default(),
1659            requests: Default::default(),
1660        };
1661
1662        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1663            .await
1664            .unwrap()
1665            .with_batch_size(1024)
1666            .with_limit(4)
1667            .build()
1668            .unwrap();
1669
1670        let batches: Vec<_> = stream.try_collect().await.unwrap();
1671        // Expect one batch for each row group
1672        assert_eq!(batches.len(), 2);
1673
1674        let batch = &batches[0];
1675        // First batch should contain all rows
1676        assert_eq!(batch.num_rows(), 3);
1677        assert_eq!(batch.num_columns(), 3);
1678        let col2 = batch.column(2).as_primitive::<Int32Type>();
1679        assert_eq!(col2.values(), &[0, 1, 2]);
1680
1681        let batch = &batches[1];
1682        // Second batch should trigger the limit and only have one row
1683        assert_eq!(batch.num_rows(), 1);
1684        assert_eq!(batch.num_columns(), 3);
1685        let col2 = batch.column(2).as_primitive::<Int32Type>();
1686        assert_eq!(col2.values(), &[3]);
1687
1688        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1689            .await
1690            .unwrap()
1691            .with_offset(2)
1692            .with_limit(3)
1693            .build()
1694            .unwrap();
1695
1696        let batches: Vec<_> = stream.try_collect().await.unwrap();
1697        // Expect one batch for each row group
1698        assert_eq!(batches.len(), 2);
1699
1700        let batch = &batches[0];
1701        // First batch should contain one row
1702        assert_eq!(batch.num_rows(), 1);
1703        assert_eq!(batch.num_columns(), 3);
1704        let col2 = batch.column(2).as_primitive::<Int32Type>();
1705        assert_eq!(col2.values(), &[2]);
1706
1707        let batch = &batches[1];
1708        // Second batch should contain two rows
1709        assert_eq!(batch.num_rows(), 2);
1710        assert_eq!(batch.num_columns(), 3);
1711        let col2 = batch.column(2).as_primitive::<Int32Type>();
1712        assert_eq!(col2.values(), &[3, 4]);
1713
1714        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1715            .await
1716            .unwrap()
1717            .with_offset(4)
1718            .with_limit(20)
1719            .build()
1720            .unwrap();
1721
1722        let batches: Vec<_> = stream.try_collect().await.unwrap();
1723        // Should skip first row group
1724        assert_eq!(batches.len(), 1);
1725
1726        let batch = &batches[0];
1727        // First batch should contain two rows
1728        assert_eq!(batch.num_rows(), 2);
1729        assert_eq!(batch.num_columns(), 3);
1730        let col2 = batch.column(2).as_primitive::<Int32Type>();
1731        assert_eq!(col2.values(), &[4, 5]);
1732    }
1733
1734    #[tokio::test]
1735    async fn test_row_filter_with_index() {
1736        let testdata = arrow::util::test_util::parquet_test_data();
1737        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1738        let data = Bytes::from(std::fs::read(path).unwrap());
1739
1740        let metadata = ParquetMetaDataReader::new()
1741            .parse_and_finish(&data)
1742            .unwrap();
1743        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1744
1745        assert_eq!(metadata.num_row_groups(), 1);
1746
1747        let async_reader = TestReader {
1748            data: data.clone(),
1749            metadata: Default::default(),
1750            requests: Default::default(),
1751        };
1752
1753        let a_filter =
1754            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1755                Ok(batch.column(0).as_boolean().clone())
1756            });
1757
1758        let b_scalar = Int8Array::from(vec![2]);
1759        let b_filter = ArrowPredicateFn::new(
1760            ProjectionMask::leaves(&parquet_schema, vec![2]),
1761            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1762        );
1763
1764        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1765
1766        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1767
1768        let options = ArrowReaderOptions::new().with_page_index(true);
1769        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1770            .await
1771            .unwrap()
1772            .with_projection(mask.clone())
1773            .with_batch_size(1024)
1774            .with_row_filter(filter)
1775            .build()
1776            .unwrap();
1777
1778        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1779
1780        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1781
1782        assert_eq!(total_rows, 730);
1783    }
1784
1785    #[tokio::test]
1786    async fn test_in_memory_row_group_sparse() {
1787        let testdata = arrow::util::test_util::parquet_test_data();
1788        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1789        let data = Bytes::from(std::fs::read(path).unwrap());
1790
1791        let metadata = ParquetMetaDataReader::new()
1792            .with_page_indexes(true)
1793            .parse_and_finish(&data)
1794            .unwrap();
1795
1796        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1797
1798        let mut metadata_builder = metadata.into_builder();
1799        let mut row_groups = metadata_builder.take_row_groups();
1800        row_groups.truncate(1);
1801        let row_group_meta = row_groups.pop().unwrap();
1802
1803        let metadata = metadata_builder
1804            .add_row_group(row_group_meta)
1805            .set_column_index(None)
1806            .set_offset_index(Some(vec![offset_index.clone()]))
1807            .build();
1808
1809        let metadata = Arc::new(metadata);
1810
1811        let num_rows = metadata.row_group(0).num_rows();
1812
1813        assert_eq!(metadata.num_row_groups(), 1);
1814
1815        let async_reader = TestReader {
1816            data: data.clone(),
1817            metadata: Default::default(),
1818            requests: Default::default(),
1819        };
1820
1821        let requests = async_reader.requests.clone();
1822        let (_, fields) = parquet_to_arrow_schema_and_fields(
1823            metadata.file_metadata().schema_descr(),
1824            ProjectionMask::all(),
1825            None,
1826        )
1827        .unwrap();
1828
1829        let _schema_desc = metadata.file_metadata().schema_descr();
1830
1831        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1832
1833        let reader_factory = ReaderFactory {
1834            metadata,
1835            fields: fields.map(Arc::new),
1836            input: async_reader,
1837            filter: None,
1838            limit: None,
1839            offset: None,
1840        };
1841
1842        let mut skip = true;
1843        let mut pages = offset_index[0].page_locations.iter().peekable();
1844
1845        // Setup `RowSelection` so that we can skip every other page, selecting the last page
1846        let mut selectors = vec![];
1847        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1848        while let Some(page) = pages.next() {
1849            let num_rows = if let Some(next_page) = pages.peek() {
1850                next_page.first_row_index - page.first_row_index
1851            } else {
1852                num_rows - page.first_row_index
1853            };
1854
1855            if skip {
1856                selectors.push(RowSelector::skip(num_rows as usize));
1857            } else {
1858                selectors.push(RowSelector::select(num_rows as usize));
1859                let start = page.offset as usize;
1860                let end = start + page.compressed_page_size as usize;
1861                expected_page_requests.push(start..end);
1862            }
1863            skip = !skip;
1864        }
1865
1866        let selection = RowSelection::from(selectors);
1867
1868        let (_factory, _reader) = reader_factory
1869            .read_row_group(0, Some(selection), projection.clone(), 48)
1870            .await
1871            .expect("reading row group");
1872
1873        let requests = requests.lock().unwrap();
1874
1875        assert_eq!(&requests[..], &expected_page_requests)
1876    }
1877
1878    #[tokio::test]
1879    async fn test_batch_size_overallocate() {
1880        let testdata = arrow::util::test_util::parquet_test_data();
1881        // `alltypes_plain.parquet` only have 8 rows
1882        let path = format!("{testdata}/alltypes_plain.parquet");
1883        let data = Bytes::from(std::fs::read(path).unwrap());
1884
1885        let async_reader = TestReader {
1886            data: data.clone(),
1887            metadata: Default::default(),
1888            requests: Default::default(),
1889        };
1890
1891        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1892            .await
1893            .unwrap();
1894
1895        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1896
1897        let stream = builder
1898            .with_projection(ProjectionMask::all())
1899            .with_batch_size(1024)
1900            .build()
1901            .unwrap();
1902        assert_ne!(1024, file_rows);
1903        assert_eq!(stream.batch_size, file_rows);
1904    }
1905
1906    #[tokio::test]
1907    async fn test_get_row_group_column_bloom_filter_without_length() {
1908        let testdata = arrow::util::test_util::parquet_test_data();
1909        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1910        let data = Bytes::from(std::fs::read(path).unwrap());
1911        test_get_row_group_column_bloom_filter(data, false).await;
1912    }
1913
1914    #[tokio::test]
1915    async fn test_parquet_record_batch_stream_schema() {
1916        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1917            schema.flattened_fields().iter().map(|f| f.name()).collect()
1918        }
1919
1920        // ParquetRecordBatchReaderBuilder::schema differs from
1921        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1922        // schema contents (in terms of custom metadata attached to schema, and fields
1923        // returned). Test to ensure this remains consistent behaviour.
1924        //
1925        // Ensure same for asynchronous versions of the above.
1926
1927        // Prep data, for a schema with nested fields, with custom metadata
1928        let mut metadata = HashMap::with_capacity(1);
1929        metadata.insert("key".to_string(), "value".to_string());
1930
1931        let nested_struct_array = StructArray::from(vec![
1932            (
1933                Arc::new(Field::new("d", DataType::Utf8, true)),
1934                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1935            ),
1936            (
1937                Arc::new(Field::new("e", DataType::Utf8, true)),
1938                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1939            ),
1940        ]);
1941        let struct_array = StructArray::from(vec![
1942            (
1943                Arc::new(Field::new("a", DataType::Int32, true)),
1944                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1945            ),
1946            (
1947                Arc::new(Field::new("b", DataType::UInt64, true)),
1948                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1949            ),
1950            (
1951                Arc::new(Field::new(
1952                    "c",
1953                    nested_struct_array.data_type().clone(),
1954                    true,
1955                )),
1956                Arc::new(nested_struct_array) as ArrayRef,
1957            ),
1958        ]);
1959
1960        let schema =
1961            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1962        let record_batch = RecordBatch::from(struct_array)
1963            .with_schema(schema.clone())
1964            .unwrap();
1965
1966        // Write parquet with custom metadata in schema
1967        let mut file = tempfile().unwrap();
1968        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1969        writer.write(&record_batch).unwrap();
1970        writer.close().unwrap();
1971
1972        let all_fields = ["a", "b", "c", "d", "e"];
1973        // (leaf indices in mask, expected names in output schema all fields)
1974        let projections = [
1975            (vec![], vec![]),
1976            (vec![0], vec!["a"]),
1977            (vec![0, 1], vec!["a", "b"]),
1978            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1979            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1980        ];
1981
1982        // Ensure we're consistent for each of these projections
1983        for (indices, expected_projected_names) in projections {
1984            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1985                // Builder schema should preserve all fields and metadata
1986                assert_eq!(get_all_field_names(&builder), all_fields);
1987                assert_eq!(builder.metadata, metadata);
1988                // Reader & batch schema should show only projected fields, and no metadata
1989                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1990                assert_eq!(reader.metadata, HashMap::default());
1991                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1992                assert_eq!(batch.metadata, HashMap::default());
1993            };
1994
1995            let builder =
1996                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1997            let sync_builder_schema = builder.schema().clone();
1998            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1999            let mut reader = builder.with_projection(mask).build().unwrap();
2000            let sync_reader_schema = reader.schema();
2001            let batch = reader.next().unwrap().unwrap();
2002            let sync_batch_schema = batch.schema();
2003            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2004
2005            // asynchronous should be same
2006            let file = tokio::fs::File::from(file.try_clone().unwrap());
2007            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2008            let async_builder_schema = builder.schema().clone();
2009            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2010            let mut reader = builder.with_projection(mask).build().unwrap();
2011            let async_reader_schema = reader.schema().clone();
2012            let batch = reader.next().await.unwrap().unwrap();
2013            let async_batch_schema = batch.schema();
2014            assert_schemas(
2015                async_builder_schema,
2016                async_reader_schema,
2017                async_batch_schema,
2018            );
2019        }
2020    }
2021
2022    #[tokio::test]
2023    async fn test_get_row_group_column_bloom_filter_with_length() {
2024        // convert to new parquet file with bloom_filter_length
2025        let testdata = arrow::util::test_util::parquet_test_data();
2026        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2027        let data = Bytes::from(std::fs::read(path).unwrap());
2028        let async_reader = TestReader {
2029            data: data.clone(),
2030            metadata: Default::default(),
2031            requests: Default::default(),
2032        };
2033        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2034            .await
2035            .unwrap();
2036        let schema = builder.schema().clone();
2037        let stream = builder.build().unwrap();
2038        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2039
2040        let mut parquet_data = Vec::new();
2041        let props = WriterProperties::builder()
2042            .set_bloom_filter_enabled(true)
2043            .build();
2044        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2045        for batch in batches {
2046            writer.write(&batch).unwrap();
2047        }
2048        writer.close().unwrap();
2049
2050        // test the new parquet file
2051        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2052    }
2053
2054    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2055        let async_reader = TestReader {
2056            data: data.clone(),
2057            metadata: Default::default(),
2058            requests: Default::default(),
2059        };
2060
2061        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2062            .await
2063            .unwrap();
2064
2065        let metadata = builder.metadata();
2066        assert_eq!(metadata.num_row_groups(), 1);
2067        let row_group = metadata.row_group(0);
2068        let column = row_group.column(0);
2069        assert_eq!(column.bloom_filter_length().is_some(), with_length);
2070
2071        let sbbf = builder
2072            .get_row_group_column_bloom_filter(0, 0)
2073            .await
2074            .unwrap()
2075            .unwrap();
2076        assert!(sbbf.check(&"Hello"));
2077        assert!(!sbbf.check(&"Hello_Not_Exists"));
2078    }
2079
2080    #[tokio::test]
2081    async fn test_nested_skip() {
2082        let schema = Arc::new(Schema::new(vec![
2083            Field::new("col_1", DataType::UInt64, false),
2084            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2085        ]));
2086
2087        // Default writer properties
2088        let props = WriterProperties::builder()
2089            .set_data_page_row_count_limit(256)
2090            .set_write_batch_size(256)
2091            .set_max_row_group_size(1024);
2092
2093        // Write data
2094        let mut file = tempfile().unwrap();
2095        let mut writer =
2096            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2097
2098        let mut builder = ListBuilder::new(StringBuilder::new());
2099        for id in 0..1024 {
2100            match id % 3 {
2101                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2102                1 => builder.append_value([Some(format!("id_{id}"))]),
2103                _ => builder.append_null(),
2104            }
2105        }
2106        let refs = vec![
2107            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2108            Arc::new(builder.finish()) as ArrayRef,
2109        ];
2110
2111        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2112        writer.write(&batch).unwrap();
2113        writer.close().unwrap();
2114
2115        let selections = [
2116            RowSelection::from(vec![
2117                RowSelector::skip(313),
2118                RowSelector::select(1),
2119                RowSelector::skip(709),
2120                RowSelector::select(1),
2121            ]),
2122            RowSelection::from(vec![
2123                RowSelector::skip(255),
2124                RowSelector::select(1),
2125                RowSelector::skip(767),
2126                RowSelector::select(1),
2127            ]),
2128            RowSelection::from(vec![
2129                RowSelector::select(255),
2130                RowSelector::skip(1),
2131                RowSelector::select(767),
2132                RowSelector::skip(1),
2133            ]),
2134            RowSelection::from(vec![
2135                RowSelector::skip(254),
2136                RowSelector::select(1),
2137                RowSelector::select(1),
2138                RowSelector::skip(767),
2139                RowSelector::select(1),
2140            ]),
2141        ];
2142
2143        for selection in selections {
2144            let expected = selection.row_count();
2145            // Read data
2146            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2147                tokio::fs::File::from_std(file.try_clone().unwrap()),
2148                ArrowReaderOptions::new().with_page_index(true),
2149            )
2150            .await
2151            .unwrap();
2152
2153            reader = reader.with_row_selection(selection);
2154
2155            let mut stream = reader.build().unwrap();
2156
2157            let mut total_rows = 0;
2158            while let Some(rb) = stream.next().await {
2159                let rb = rb.unwrap();
2160                total_rows += rb.num_rows();
2161            }
2162            assert_eq!(total_rows, expected);
2163        }
2164    }
2165
2166    #[tokio::test]
2167    async fn test_row_filter_nested() {
2168        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2169        let b = StructArray::from(vec![
2170            (
2171                Arc::new(Field::new("aa", DataType::Utf8, true)),
2172                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2173            ),
2174            (
2175                Arc::new(Field::new("bb", DataType::Utf8, true)),
2176                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2177            ),
2178        ]);
2179        let c = Int32Array::from_iter(0..6);
2180        let data = RecordBatch::try_from_iter([
2181            ("a", Arc::new(a) as ArrayRef),
2182            ("b", Arc::new(b) as ArrayRef),
2183            ("c", Arc::new(c) as ArrayRef),
2184        ])
2185        .unwrap();
2186
2187        let mut buf = Vec::with_capacity(1024);
2188        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2189        writer.write(&data).unwrap();
2190        writer.close().unwrap();
2191
2192        let data: Bytes = buf.into();
2193        let metadata = ParquetMetaDataReader::new()
2194            .parse_and_finish(&data)
2195            .unwrap();
2196        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2197
2198        let test = TestReader {
2199            data,
2200            metadata: Default::default(),
2201            requests: Default::default(),
2202        };
2203        let requests = test.requests.clone();
2204
2205        let a_scalar = StringArray::from_iter_values(["b"]);
2206        let a_filter = ArrowPredicateFn::new(
2207            ProjectionMask::leaves(&parquet_schema, vec![0]),
2208            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2209        );
2210
2211        let b_scalar = StringArray::from_iter_values(["4"]);
2212        let b_filter = ArrowPredicateFn::new(
2213            ProjectionMask::leaves(&parquet_schema, vec![2]),
2214            move |batch| {
2215                // Filter on the second element of the struct.
2216                let struct_array = batch
2217                    .column(0)
2218                    .as_any()
2219                    .downcast_ref::<StructArray>()
2220                    .unwrap();
2221                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2222            },
2223        );
2224
2225        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2226
2227        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2228        let stream = ParquetRecordBatchStreamBuilder::new(test)
2229            .await
2230            .unwrap()
2231            .with_projection(mask.clone())
2232            .with_batch_size(1024)
2233            .with_row_filter(filter)
2234            .build()
2235            .unwrap();
2236
2237        let batches: Vec<_> = stream.try_collect().await.unwrap();
2238        assert_eq!(batches.len(), 1);
2239
2240        let batch = &batches[0];
2241        assert_eq!(batch.num_rows(), 1);
2242        assert_eq!(batch.num_columns(), 2);
2243
2244        let col = batch.column(0);
2245        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2246        assert_eq!(val, "b");
2247
2248        let col = batch.column(1);
2249        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2250        assert_eq!(val, 3);
2251
2252        // Should only have made 3 requests
2253        assert_eq!(requests.lock().unwrap().len(), 3);
2254    }
2255
2256    #[tokio::test]
2257    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2258        use tokio::fs::File;
2259        let testdata = arrow::util::test_util::parquet_test_data();
2260        let path = format!("{testdata}/alltypes_plain.parquet");
2261        let mut file = File::open(&path).await.unwrap();
2262        let file_size = file.metadata().await.unwrap().len();
2263        let mut metadata = ParquetMetaDataReader::new()
2264            .with_page_indexes(true)
2265            .load_and_finish(&mut file, file_size)
2266            .await
2267            .unwrap();
2268
2269        metadata.set_offset_index(Some(vec![]));
2270        let options = ArrowReaderOptions::new().with_page_index(true);
2271        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2272        let reader =
2273            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2274                .build()
2275                .unwrap();
2276
2277        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2278        assert_eq!(result.len(), 1);
2279    }
2280
2281    #[tokio::test]
2282    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2283        use tokio::fs::File;
2284        let testdata = arrow::util::test_util::parquet_test_data();
2285        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2286        let mut file = File::open(&path).await.unwrap();
2287        let file_size = file.metadata().await.unwrap().len();
2288        let metadata = ParquetMetaDataReader::new()
2289            .with_page_indexes(true)
2290            .load_and_finish(&mut file, file_size)
2291            .await
2292            .unwrap();
2293
2294        let options = ArrowReaderOptions::new().with_page_index(true);
2295        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2296        let reader =
2297            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2298                .build()
2299                .unwrap();
2300
2301        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2302        assert_eq!(result.len(), 8);
2303    }
2304
2305    #[tokio::test]
2306    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2307        use tempfile::TempDir;
2308        use tokio::fs::File;
2309        fn write_metadata_to_local_file(
2310            metadata: ParquetMetaData,
2311            file: impl AsRef<std::path::Path>,
2312        ) {
2313            use crate::file::metadata::ParquetMetaDataWriter;
2314            use std::fs::File;
2315            let file = File::create(file).unwrap();
2316            ParquetMetaDataWriter::new(file, &metadata)
2317                .finish()
2318                .unwrap()
2319        }
2320
2321        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2322            use std::fs::File;
2323            let file = File::open(file).unwrap();
2324            ParquetMetaDataReader::new()
2325                .with_page_indexes(true)
2326                .parse_and_finish(&file)
2327                .unwrap()
2328        }
2329
2330        let testdata = arrow::util::test_util::parquet_test_data();
2331        let path = format!("{testdata}/alltypes_plain.parquet");
2332        let mut file = File::open(&path).await.unwrap();
2333        let file_size = file.metadata().await.unwrap().len();
2334        let metadata = ParquetMetaDataReader::new()
2335            .with_page_indexes(true)
2336            .load_and_finish(&mut file, file_size)
2337            .await
2338            .unwrap();
2339
2340        let tempdir = TempDir::new().unwrap();
2341        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2342        write_metadata_to_local_file(metadata, &metadata_path);
2343        let metadata = read_metadata_from_local_file(&metadata_path);
2344
2345        let options = ArrowReaderOptions::new().with_page_index(true);
2346        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2347        let reader =
2348            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2349                .build()
2350                .unwrap();
2351
2352        // Panics here
2353        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2354        assert_eq!(result.len(), 1);
2355    }
2356}