Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Default batch size for reading parquet files
61pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63/// Builder for constructing Parquet readers that decode into [Apache Arrow]
64/// arrays.
65///
66/// Most users should use one of the following specializations:
67///
68/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
69/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
70/// * decoder API: [`ParquetPushDecoderBuilder`]
71///
72/// # Features
73/// * Projection pushdown: [`Self::with_projection`]
74/// * Cached metadata: [`ArrowReaderMetadata::load`]
75/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
76/// * Row group filtering: [`Self::with_row_groups`]
77/// * Range filtering: [`Self::with_row_selection`]
78/// * Row level filtering: [`Self::with_row_filter`]
79///
80/// # Implementing Predicate Pushdown
81///
82/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
83/// process, which is efficient and allows the most low level optimizations.
84///
85/// However, most Parquet based systems will apply filters at many steps prior
86/// to decoding such as pruning files, row groups and data pages. This crate
87/// provides the low level APIs needed to implement such filtering, but does not
88/// include any logic to actually evaluate predicates. For example:
89///
90/// * [`Self::with_row_groups`] for Row Group pruning
91/// * [`Self::with_row_selection`] for data page pruning
92/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
93///
94/// The rationale for this design is that implementing predicate pushdown is a
95/// complex topic and varies significantly from system to system. For example
96///
97/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
98/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
99/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
100/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
101///
102/// You can read more about this design in the [Querying Parquet with
103/// Millisecond Latency] Arrow blog post.
104///
105/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
106/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
107/// [Apache Arrow]: https://arrow.apache.org/
108/// [`StatisticsConverter`]: statistics::StatisticsConverter
109/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
110pub struct ArrowReaderBuilder<T> {
111    /// The "input" to read parquet data from.
112    ///
113    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
114    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
115    ///
116    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
117    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
118    pub(crate) input: T,
119
120    pub(crate) metadata: Arc<ParquetMetaData>,
121
122    pub(crate) schema: SchemaRef,
123
124    pub(crate) fields: Option<Arc<ParquetField>>,
125
126    pub(crate) batch_size: usize,
127
128    pub(crate) row_groups: Option<Vec<usize>>,
129
130    pub(crate) projection: ProjectionMask,
131
132    pub(crate) filter: Option<RowFilter>,
133
134    pub(crate) selection: Option<RowSelection>,
135
136    pub(crate) row_selection_policy: RowSelectionPolicy,
137
138    pub(crate) limit: Option<usize>,
139
140    pub(crate) offset: Option<usize>,
141
142    pub(crate) metrics: ArrowReaderMetrics,
143
144    pub(crate) max_predicate_cache_size: usize,
145}
146
147impl<T: Debug> Debug for ArrowReaderBuilder<T> {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("ArrowReaderBuilder<T>")
150            .field("input", &self.input)
151            .field("metadata", &self.metadata)
152            .field("schema", &self.schema)
153            .field("fields", &self.fields)
154            .field("batch_size", &self.batch_size)
155            .field("row_groups", &self.row_groups)
156            .field("projection", &self.projection)
157            .field("filter", &self.filter)
158            .field("selection", &self.selection)
159            .field("row_selection_policy", &self.row_selection_policy)
160            .field("limit", &self.limit)
161            .field("offset", &self.offset)
162            .field("metrics", &self.metrics)
163            .finish()
164    }
165}
166
167impl<T> ArrowReaderBuilder<T> {
168    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
169        Self {
170            input,
171            metadata: metadata.metadata,
172            schema: metadata.schema,
173            fields: metadata.fields,
174            batch_size: DEFAULT_BATCH_SIZE,
175            row_groups: None,
176            projection: ProjectionMask::all(),
177            filter: None,
178            selection: None,
179            row_selection_policy: RowSelectionPolicy::default(),
180            limit: None,
181            offset: None,
182            metrics: ArrowReaderMetrics::Disabled,
183            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
184        }
185    }
186
187    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
188    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
189        &self.metadata
190    }
191
192    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
193    pub fn parquet_schema(&self) -> &SchemaDescriptor {
194        self.metadata.file_metadata().schema_descr()
195    }
196
197    /// Returns the arrow [`SchemaRef`] for this parquet file
198    pub fn schema(&self) -> &SchemaRef {
199        &self.schema
200    }
201
202    /// Set the size of [`RecordBatch`] to produce. Defaults to [`DEFAULT_BATCH_SIZE`]
203    /// If the batch_size more than the file row count, use the file row count.
204    pub fn with_batch_size(self, batch_size: usize) -> Self {
205        // Try to avoid allocate large buffer
206        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
207        Self { batch_size, ..self }
208    }
209
210    /// Only read data from the provided row group indexes
211    ///
212    /// This is also called row group filtering
213    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
214        Self {
215            row_groups: Some(row_groups),
216            ..self
217        }
218    }
219
220    /// Only read data from the provided column indexes
221    pub fn with_projection(self, mask: ProjectionMask) -> Self {
222        Self {
223            projection: mask,
224            ..self
225        }
226    }
227
228    /// Configure how row selections should be materialised during execution
229    ///
230    /// See [`RowSelectionPolicy`] for more details
231    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
232        Self {
233            row_selection_policy: policy,
234            ..self
235        }
236    }
237
238    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
239    /// data into memory.
240    ///
241    /// This feature is used to restrict which rows are decoded within row
242    /// groups, skipping ranges of rows that are not needed. Such selections
243    /// could be determined by evaluating predicates against the parquet page
244    /// [`Index`] or some other external information available to a query
245    /// engine.
246    ///
247    /// # Notes
248    ///
249    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
250    /// applying the row selection, and therefore rows from skipped row groups
251    /// should not be included in the [`RowSelection`] (see example below)
252    ///
253    /// It is recommended to enable writing the page index if using this
254    /// functionality, to allow more efficient skipping over data pages. See
255    /// [`ArrowReaderOptions::with_page_index`].
256    ///
257    /// # Example
258    ///
259    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
260    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
261    /// row group 3:
262    ///
263    /// ```text
264    ///   Row Group 0, 1000 rows (selected)
265    ///   Row Group 1, 1000 rows (skipped)
266    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
267    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
268    /// ```
269    ///
270    /// You could pass the following [`RowSelection`]:
271    ///
272    /// ```text
273    ///  Select 1000    (scan all rows in row group 0)
274    ///  Skip 50        (skip the first 50 rows in row group 2)
275    ///  Select 50      (scan rows 50-100 in row group 2)
276    ///  Skip 900       (skip the remaining rows in row group 2)
277    ///  Skip 200       (skip the first 200 rows in row group 3)
278    ///  Select 100     (scan rows 200-300 in row group 3)
279    ///  Skip 700       (skip the remaining rows in row group 3)
280    /// ```
281    /// Note there is no entry for the (entirely) skipped row group 1.
282    ///
283    /// Note you can represent the same selection with fewer entries. Instead of
284    ///
285    /// ```text
286    ///  Skip 900       (skip the remaining rows in row group 2)
287    ///  Skip 200       (skip the first 200 rows in row group 3)
288    /// ```
289    ///
290    /// you could use
291    ///
292    /// ```text
293    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
294    /// ```
295    ///
296    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
297    pub fn with_row_selection(self, selection: RowSelection) -> Self {
298        Self {
299            selection: Some(selection),
300            ..self
301        }
302    }
303
304    /// Provide a [`RowFilter`] to skip decoding rows
305    ///
306    /// Row filters are applied after row group selection and row selection
307    ///
308    /// It is recommended to enable reading the page index if using this functionality, to allow
309    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
310    ///
311    /// See the [blog post on late materialization] for a more technical explanation.
312    ///
313    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
314    ///
315    /// # Example
316    /// ```rust
317    /// # use std::fs::File;
318    /// # use arrow_array::Int32Array;
319    /// # use parquet::arrow::ProjectionMask;
320    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
321    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
322    /// # let testdata = arrow::util::test_util::parquet_test_data();
323    /// # let path = format!("{testdata}/alltypes_plain.parquet");
324    /// # let file = File::open(&path)?;
325    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
326    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
327    /// // Create predicate that evaluates `int_col != 1`.
328    /// // `int_col` column has index 4 (zero based) in the schema
329    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
330    /// // Only the projection columns are passed to the predicate so
331    /// // int_col is column 0 in the predicate
332    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
333    ///     let int_col = batch.column(0);
334    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
335    /// });
336    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
337    /// // The filter will be invoked during the reading process
338    /// let reader = builder.with_row_filter(row_filter).build()?;
339    /// # for b in reader { let _ = b?; }
340    /// # Ok(())
341    /// # }
342    /// ```
343    pub fn with_row_filter(self, filter: RowFilter) -> Self {
344        Self {
345            filter: Some(filter),
346            ..self
347        }
348    }
349
350    /// Provide a limit to the number of rows to be read
351    ///
352    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
353    /// allowing it to limit the final set of rows decoded after any pushed down predicates
354    ///
355    /// It is recommended to enable reading the page index if using this functionality, to allow
356    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
357    pub fn with_limit(self, limit: usize) -> Self {
358        Self {
359            limit: Some(limit),
360            ..self
361        }
362    }
363
364    /// Provide an offset to skip over the given number of rows
365    ///
366    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
367    /// allowing it to skip rows after any pushed down predicates
368    ///
369    /// It is recommended to enable reading the page index if using this functionality, to allow
370    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
371    pub fn with_offset(self, offset: usize) -> Self {
372        Self {
373            offset: Some(offset),
374            ..self
375        }
376    }
377
378    /// Specify metrics collection during reading
379    ///
380    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
381    /// clone of the provided metrics to the builder.
382    ///
383    /// For example:
384    ///
385    /// ```rust
386    /// # use std::sync::Arc;
387    /// # use bytes::Bytes;
388    /// # use arrow_array::{Int32Array, RecordBatch};
389    /// # use arrow_schema::{DataType, Field, Schema};
390    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
391    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
392    /// # use parquet::arrow::ArrowWriter;
393    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
394    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
395    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
396    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
397    /// # writer.write(&batch).unwrap();
398    /// # writer.close().unwrap();
399    /// # let file = Bytes::from(file);
400    /// // Create metrics object to pass into the reader
401    /// let metrics = ArrowReaderMetrics::enabled();
402    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
403    ///   // Configure the builder to use the metrics by passing a clone
404    ///   .with_metrics(metrics.clone())
405    ///   // Build the reader
406    ///   .build().unwrap();
407    /// // .. read data from the reader ..
408    ///
409    /// // check the metrics
410    /// assert!(metrics.records_read_from_inner().is_some());
411    /// ```
412    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
413        Self { metrics, ..self }
414    }
415
416    /// Set the maximum size (per row group) of the predicate cache in bytes for
417    /// the async decoder.
418    ///
419    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
420    /// unlimited cache size.
421    ///
422    /// This cache is used to store decoded arrays that are used in
423    /// predicate evaluation ([`Self::with_row_filter`]).
424    ///
425    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
426    /// [this ticket] for more details and alternatives.
427    ///
428    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
429    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
430    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
431        Self {
432            max_predicate_cache_size,
433            ..self
434        }
435    }
436}
437
438/// Options that control how [`ParquetMetaData`] is read when constructing
439/// an Arrow reader.
440///
441/// To use these options, pass them to one of the following methods:
442/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
443/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
444///
445/// For fine-grained control over metadata loading, use
446/// [`ArrowReaderMetadata::load`] to load metadata with these options,
447///
448/// See [`ArrowReaderBuilder`] for how to configure how the column data
449/// is then read from the file, including projection and filter pushdown
450///
451/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
452#[derive(Debug, Clone, Default)]
453pub struct ArrowReaderOptions {
454    /// Should the reader strip any user defined metadata from the Arrow schema
455    skip_arrow_metadata: bool,
456    /// If provided, used as the schema hint when determining the Arrow schema,
457    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
458    ///
459    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
460    supplied_schema: Option<SchemaRef>,
461
462    pub(crate) column_index: PageIndexPolicy,
463    pub(crate) offset_index: PageIndexPolicy,
464
465    /// Options to control reading of Parquet metadata
466    metadata_options: ParquetMetaDataOptions,
467    /// If encryption is enabled, the file decryption properties can be provided
468    #[cfg(feature = "encryption")]
469    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
470
471    virtual_columns: Vec<FieldRef>,
472}
473
474impl ArrowReaderOptions {
475    /// Create a new [`ArrowReaderOptions`] with the default settings
476    pub fn new() -> Self {
477        Self::default()
478    }
479
480    /// Skip decoding the embedded arrow metadata (defaults to `false`)
481    ///
482    /// Parquet files generated by some writers may contain embedded arrow
483    /// schema and metadata.
484    /// This may not be correct or compatible with your system,
485    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
486    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
487        Self {
488            skip_arrow_metadata,
489            ..self
490        }
491    }
492
493    /// Provide a schema hint to use when reading the Parquet file.
494    ///
495    /// If provided, this schema takes precedence over any arrow schema embedded
496    /// in the metadata (see the [`arrow`] documentation for more details).
497    ///
498    /// If the provided schema is not compatible with the data stored in the
499    /// parquet file schema, an error will be returned when constructing the
500    /// builder.
501    ///
502    /// This option is only required if you want to explicitly control the
503    /// conversion of Parquet types to Arrow types, such as casting a column to
504    /// a different type. For example, if you wanted to read an Int64 in
505    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
506    ///
507    /// [`arrow`]: crate::arrow
508    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
509    ///
510    /// # Notes
511    ///
512    /// The provided schema must have the same number of columns as the parquet schema and
513    /// the column names must be the same.
514    ///
515    /// # Example
516    /// ```
517    /// # use std::sync::Arc;
518    /// # use bytes::Bytes;
519    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
520    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
521    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
522    /// # use parquet::arrow::ArrowWriter;
523    /// // Write data - schema is inferred from the data to be Int32
524    /// let mut file = Vec::new();
525    /// let batch = RecordBatch::try_from_iter(vec![
526    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
527    /// ]).unwrap();
528    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
529    /// writer.write(&batch).unwrap();
530    /// writer.close().unwrap();
531    /// let file = Bytes::from(file);
532    ///
533    /// // Read the file back.
534    /// // Supply a schema that interprets the Int32 column as a Timestamp.
535    /// let supplied_schema = Arc::new(Schema::new(vec![
536    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
537    /// ]));
538    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
539    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
540    ///     file.clone(),
541    ///     options
542    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
543    ///
544    /// // Create the reader and read the data using the supplied schema.
545    /// let mut reader = builder.build().unwrap();
546    /// let _batch = reader.next().unwrap().unwrap();
547    /// ```
548    ///
549    /// # Example: Preserving Dictionary Encoding
550    ///
551    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
552    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
553    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
554    ///
555    /// ```
556    /// # use std::sync::Arc;
557    /// # use bytes::Bytes;
558    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
559    /// # use arrow_schema::{DataType, Field, Schema};
560    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
561    /// # use parquet::arrow::ArrowWriter;
562    /// // Write a Parquet file with string data
563    /// let mut file = Vec::new();
564    /// let schema = Arc::new(Schema::new(vec![
565    ///     Field::new("city", DataType::Utf8, false)
566    /// ]));
567    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
568    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
569    ///
570    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
571    /// writer.write(&batch).unwrap();
572    /// writer.close().unwrap();
573    /// let file = Bytes::from(file);
574    ///
575    /// // Read the file back, requesting dictionary encoding preservation
576    /// let dict_schema = Arc::new(Schema::new(vec![
577    ///     Field::new("city", DataType::Dictionary(
578    ///         Box::new(DataType::Int32),
579    ///         Box::new(DataType::Utf8)
580    ///     ), false)
581    /// ]));
582    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
583    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
584    ///     file.clone(),
585    ///     options
586    /// ).unwrap();
587    ///
588    /// let mut reader = builder.build().unwrap();
589    /// let batch = reader.next().unwrap().unwrap();
590    ///
591    /// // The column is now a DictionaryArray
592    /// assert!(matches!(
593    ///     batch.column(0).data_type(),
594    ///     DataType::Dictionary(_, _)
595    /// ));
596    /// ```
597    ///
598    /// **Note**: Dictionary encoding preservation works best when:
599    /// 1. The original column was dictionary encoded (the default for string columns)
600    /// 2. There are a small number of distinct values
601    pub fn with_schema(self, schema: SchemaRef) -> Self {
602        Self {
603            supplied_schema: Some(schema),
604            skip_arrow_metadata: true,
605            ..self
606        }
607    }
608
609    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
610    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
611    ///
612    /// The `PageIndex` can be used to push down predicates to the parquet scan,
613    /// potentially eliminating unnecessary IO, by some query engines.
614    ///
615    /// If this is enabled, [`ParquetMetaData::column_index`] and
616    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
617    /// information is present in the file.
618    ///
619    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
620    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
621    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
622    pub fn with_page_index(self, page_index: bool) -> Self {
623        self.with_page_index_policy(PageIndexPolicy::from(page_index))
624    }
625
626    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
627    ///
628    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
629    /// This method sets the same policy for both. For fine-grained control, use
630    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
631    ///
632    /// See [`Self::with_page_index`] for more details on page indexes.
633    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
634        self.with_column_index_policy(policy)
635            .with_offset_index_policy(policy)
636    }
637
638    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
639    ///
640    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
641    /// for predicate pushdown and page-level pruning.
642    ///
643    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
644    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
645        self.column_index = policy;
646        self
647    }
648
649    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
650    ///
651    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
652    /// efficient page-level skipping and random access within column chunks.
653    ///
654    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
655    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
656        self.offset_index = policy;
657        self
658    }
659
660    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
661    /// footer will be skipped.
662    ///
663    /// This can be used to avoid reparsing the schema from the file when it is
664    /// already known.
665    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
666        self.metadata_options.set_schema(schema);
667        self
668    }
669
670    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
671    /// (defaults to `false`).
672    ///
673    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
674    /// might be desirable.
675    ///
676    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
677    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
678    /// [`encoding_stats`]:
679    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
680    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
681        self.metadata_options.set_encoding_stats_as_mask(val);
682        self
683    }
684
685    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
686    ///
687    /// [`encoding_stats`]:
688    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
689    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
690        self.metadata_options.set_encoding_stats_policy(policy);
691        self
692    }
693
694    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
695    ///
696    /// [`statistics`]:
697    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
698    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
699        self.metadata_options.set_column_stats_policy(policy);
700        self
701    }
702
703    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
704    ///
705    /// [`size_statistics`]:
706    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
707    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
708        self.metadata_options.set_size_stats_policy(policy);
709        self
710    }
711
712    /// Provide the file decryption properties to use when reading encrypted parquet files.
713    ///
714    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
715    #[cfg(feature = "encryption")]
716    pub fn with_file_decryption_properties(
717        self,
718        file_decryption_properties: Arc<FileDecryptionProperties>,
719    ) -> Self {
720        Self {
721            file_decryption_properties: Some(file_decryption_properties),
722            ..self
723        }
724    }
725
726    /// Include virtual columns in the output.
727    ///
728    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
729    ///
730    /// # Example
731    /// ```
732    /// # use std::sync::Arc;
733    /// # use bytes::Bytes;
734    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
735    /// # use arrow_schema::{DataType, Field, Schema};
736    /// # use parquet::arrow::{ArrowWriter, RowNumber};
737    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
738    /// #
739    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
740    /// // Create a simple record batch with some data
741    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
742    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
743    ///
744    /// // Write the batch to an in-memory buffer
745    /// let mut file = Vec::new();
746    /// let mut writer = ArrowWriter::try_new(
747    ///     &mut file,
748    ///     batch.schema(),
749    ///     None
750    /// )?;
751    /// writer.write(&batch)?;
752    /// writer.close()?;
753    /// let file = Bytes::from(file);
754    ///
755    /// // Create a virtual column for row numbers
756    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
757    ///     .with_extension_type(RowNumber));
758    ///
759    /// // Configure options with virtual columns
760    /// let options = ArrowReaderOptions::new()
761    ///     .with_virtual_columns(vec![row_number_field])?;
762    ///
763    /// // Create a reader with the options
764    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
765    ///     file,
766    ///     options
767    /// )?
768    /// .build()?;
769    ///
770    /// // Read the batch - it will include both the original column and the virtual row_number column
771    /// let result_batch = reader.next().unwrap()?;
772    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
773    /// assert_eq!(result_batch.num_rows(), 3);
774    /// #
775    /// # Ok(())
776    /// # }
777    /// ```
778    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
779        // Validate that all fields are virtual columns
780        for field in &virtual_columns {
781            if !is_virtual_column(field) {
782                return Err(ParquetError::General(format!(
783                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
784                    field.name()
785                )));
786            }
787        }
788        Ok(Self {
789            virtual_columns,
790            ..self
791        })
792    }
793
794    #[deprecated(
795        since = "57.2.0",
796        note = "Use `column_index_policy` or `offset_index_policy` instead"
797    )]
798    /// Returns whether page index reading is enabled.
799    ///
800    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
801    ///
802    /// This can be set via [`with_page_index`][Self::with_page_index] or
803    /// [`with_page_index_policy`][Self::with_page_index_policy].
804    pub fn page_index(&self) -> bool {
805        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
806    }
807
808    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
809    ///
810    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
811    /// or [`with_page_index_policy`][Self::with_page_index_policy].
812    pub fn offset_index_policy(&self) -> PageIndexPolicy {
813        self.offset_index
814    }
815
816    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
817    ///
818    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
819    /// or [`with_page_index_policy`][Self::with_page_index_policy].
820    pub fn column_index_policy(&self) -> PageIndexPolicy {
821        self.column_index
822    }
823
824    /// Retrieve the currently set metadata decoding options.
825    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
826        &self.metadata_options
827    }
828
829    /// Retrieve the currently set file decryption properties.
830    ///
831    /// This can be set via
832    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
833    #[cfg(feature = "encryption")]
834    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
835        self.file_decryption_properties.as_ref()
836    }
837}
838
839/// The metadata necessary to construct a [`ArrowReaderBuilder`]
840///
841/// Note this structure is cheaply clone-able as it consists of several arcs.
842///
843/// This structure allows
844///
845/// 1. Loading metadata for a file once and then using that same metadata to
846///    construct multiple separate readers, for example, to distribute readers
847///    across multiple threads
848///
849/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
850///    from the file each time a reader is constructed.
851///
852/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
853#[derive(Debug, Clone)]
854pub struct ArrowReaderMetadata {
855    /// The Parquet Metadata, if known aprior
856    pub(crate) metadata: Arc<ParquetMetaData>,
857    /// The Arrow Schema
858    pub(crate) schema: SchemaRef,
859    /// The Parquet schema (root field)
860    pub(crate) fields: Option<Arc<ParquetField>>,
861}
862
863impl ArrowReaderMetadata {
864    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
865    /// and [`ChunkReader`]
866    ///
867    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
868    /// example of how this can be used
869    ///
870    /// # Notes
871    ///
872    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
873    /// `Self::metadata` is missing the page index, this function will attempt
874    /// to load the page index by making an object store request.
875    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
876        let metadata = ParquetMetaDataReader::new()
877            .with_column_index_policy(options.column_index)
878            .with_offset_index_policy(options.offset_index)
879            .with_metadata_options(Some(options.metadata_options.clone()));
880        #[cfg(feature = "encryption")]
881        let metadata = metadata.with_decryption_properties(
882            options.file_decryption_properties.as_ref().map(Arc::clone),
883        );
884        let metadata = metadata.parse_and_finish(reader)?;
885        Self::try_new(Arc::new(metadata), options)
886    }
887
888    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
889    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
890    ///
891    /// # Notes
892    ///
893    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
894    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
895    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
896        match options.supplied_schema {
897            Some(supplied_schema) => Self::with_supplied_schema(
898                metadata,
899                supplied_schema.clone(),
900                &options.virtual_columns,
901            ),
902            None => {
903                let kv_metadata = match options.skip_arrow_metadata {
904                    true => None,
905                    false => metadata.file_metadata().key_value_metadata(),
906                };
907
908                let (schema, fields) = parquet_to_arrow_schema_and_fields(
909                    metadata.file_metadata().schema_descr(),
910                    ProjectionMask::all(),
911                    kv_metadata,
912                    &options.virtual_columns,
913                )?;
914
915                Ok(Self {
916                    metadata,
917                    schema: Arc::new(schema),
918                    fields: fields.map(Arc::new),
919                })
920            }
921        }
922    }
923
924    fn with_supplied_schema(
925        metadata: Arc<ParquetMetaData>,
926        supplied_schema: SchemaRef,
927        virtual_columns: &[FieldRef],
928    ) -> Result<Self> {
929        let parquet_schema = metadata.file_metadata().schema_descr();
930        let field_levels = parquet_to_arrow_field_levels_with_virtual(
931            parquet_schema,
932            ProjectionMask::all(),
933            Some(supplied_schema.fields()),
934            virtual_columns,
935        )?;
936        let fields = field_levels.fields;
937        let inferred_len = fields.len();
938        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
939        // Ensure the supplied schema has the same number of columns as the parquet schema.
940        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
941        // different lengths, but we check here to be safe.
942        if inferred_len != supplied_len {
943            return Err(arrow_err!(format!(
944                "Incompatible supplied Arrow schema: expected {} columns received {}",
945                inferred_len, supplied_len
946            )));
947        }
948
949        let mut errors = Vec::new();
950
951        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
952
953        for (field1, field2) in field_iter {
954            if field1.data_type() != field2.data_type() {
955                errors.push(format!(
956                    "data type mismatch for field {}: requested {} but found {}",
957                    field1.name(),
958                    field1.data_type(),
959                    field2.data_type()
960                ));
961            }
962            if field1.is_nullable() != field2.is_nullable() {
963                errors.push(format!(
964                    "nullability mismatch for field {}: expected {:?} but found {:?}",
965                    field1.name(),
966                    field1.is_nullable(),
967                    field2.is_nullable()
968                ));
969            }
970            if field1.metadata() != field2.metadata() {
971                errors.push(format!(
972                    "metadata mismatch for field {}: expected {:?} but found {:?}",
973                    field1.name(),
974                    field1.metadata(),
975                    field2.metadata()
976                ));
977            }
978        }
979
980        if !errors.is_empty() {
981            let message = errors.join(", ");
982            return Err(ParquetError::ArrowError(format!(
983                "Incompatible supplied Arrow schema: {message}",
984            )));
985        }
986
987        Ok(Self {
988            metadata,
989            schema: supplied_schema,
990            fields: field_levels.levels.map(Arc::new),
991        })
992    }
993
994    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
995    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
996        &self.metadata
997    }
998
999    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
1000    pub fn parquet_schema(&self) -> &SchemaDescriptor {
1001        self.metadata.file_metadata().schema_descr()
1002    }
1003
1004    /// Returns the arrow [`SchemaRef`] for this parquet file
1005    pub fn schema(&self) -> &SchemaRef {
1006        &self.schema
1007    }
1008}
1009
1010#[doc(hidden)]
1011// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1012pub struct SyncReader<T: ChunkReader>(T);
1013
1014impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1015    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1016        f.debug_tuple("SyncReader").field(&self.0).finish()
1017    }
1018}
1019
1020/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1021///
1022/// # See Also
1023/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1024/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1025/// * [`ArrowReaderBuilder`] for additional member functions
1026pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1027
1028impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1029    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1030    ///
1031    /// ```
1032    /// # use std::sync::Arc;
1033    /// # use bytes::Bytes;
1034    /// # use arrow_array::{Int32Array, RecordBatch};
1035    /// # use arrow_schema::{DataType, Field, Schema};
1036    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1037    /// # use parquet::arrow::ArrowWriter;
1038    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1039    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1040    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1041    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1042    /// # writer.write(&batch).unwrap();
1043    /// # writer.close().unwrap();
1044    /// # let file = Bytes::from(file);
1045    /// // Build the reader from anything that implements `ChunkReader`
1046    /// // such as a `File`, or `Bytes`
1047    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1048    /// // The builder has access to ParquetMetaData such
1049    /// // as the number and layout of row groups
1050    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1051    /// // Call build to create the reader
1052    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1053    /// // Read data
1054    /// while let Some(batch) = reader.next().transpose()? {
1055    ///     println!("Read {} rows", batch.num_rows());
1056    /// }
1057    /// # Ok::<(), parquet::errors::ParquetError>(())
1058    /// ```
1059    pub fn try_new(reader: T) -> Result<Self> {
1060        Self::try_new_with_options(reader, Default::default())
1061    }
1062
1063    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1064    ///
1065    /// Use this method if you want to control the options for reading the
1066    /// [`ParquetMetaData`]
1067    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1068        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1069        Ok(Self::new_with_metadata(reader, metadata))
1070    }
1071
1072    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1073    ///
1074    /// Use this method if you already have [`ParquetMetaData`] for a file.
1075    /// This interface allows:
1076    ///
1077    /// 1. Loading metadata once and using it to create multiple builders with
1078    ///    potentially different settings or run on different threads
1079    ///
1080    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1081    ///    file each time a reader is constructed.
1082    ///
1083    /// See the docs on [`ArrowReaderMetadata`] for more details
1084    ///
1085    /// # Example
1086    /// ```
1087    /// # use std::fs::metadata;
1088    /// # use std::sync::Arc;
1089    /// # use bytes::Bytes;
1090    /// # use arrow_array::{Int32Array, RecordBatch};
1091    /// # use arrow_schema::{DataType, Field, Schema};
1092    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1093    /// # use parquet::arrow::ArrowWriter;
1094    /// #
1095    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1096    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1097    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1098    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1099    /// # writer.write(&batch).unwrap();
1100    /// # writer.close().unwrap();
1101    /// # let file = Bytes::from(file);
1102    /// #
1103    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1104    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1105    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1106    ///
1107    /// // Should be able to read from both in parallel
1108    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1109    /// ```
1110    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1111        Self::new_builder(SyncReader(input), metadata)
1112    }
1113
1114    /// Read bloom filter for a column in a row group
1115    ///
1116    /// Returns `None` if the column does not have a bloom filter
1117    ///
1118    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1119    pub fn get_row_group_column_bloom_filter(
1120        &self,
1121        row_group_idx: usize,
1122        column_idx: usize,
1123    ) -> Result<Option<Sbbf>> {
1124        let metadata = self.metadata.row_group(row_group_idx);
1125        let column_metadata = metadata.column(column_idx);
1126
1127        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1128            offset
1129                .try_into()
1130                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1131        } else {
1132            return Ok(None);
1133        };
1134
1135        let buffer = match column_metadata.bloom_filter_length() {
1136            Some(length) => self.input.0.get_bytes(offset, length as usize),
1137            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1138        }?;
1139
1140        let (header, bitset_offset) =
1141            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1142
1143        match header.algorithm {
1144            BloomFilterAlgorithm::BLOCK => {
1145                // this match exists to future proof the singleton algorithm enum
1146            }
1147        }
1148        match header.compression {
1149            BloomFilterCompression::UNCOMPRESSED => {
1150                // this match exists to future proof the singleton compression enum
1151            }
1152        }
1153        match header.hash {
1154            BloomFilterHash::XXHASH => {
1155                // this match exists to future proof the singleton hash enum
1156            }
1157        }
1158
1159        let bitset = match column_metadata.bloom_filter_length() {
1160            Some(_) => buffer.slice(
1161                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1162                    - TryInto::<usize>::try_into(offset).unwrap())..,
1163            ),
1164            None => {
1165                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1166                    ParquetError::General("Bloom filter length is invalid".to_string())
1167                })?;
1168                self.input.0.get_bytes(bitset_offset, bitset_length)?
1169            }
1170        };
1171        Ok(Some(Sbbf::new(&bitset)))
1172    }
1173
1174    /// Build a [`ParquetRecordBatchReader`]
1175    ///
1176    /// Note: this will eagerly evaluate any `RowFilter` before returning
1177    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1178        let Self {
1179            input,
1180            metadata,
1181            schema: _,
1182            fields,
1183            batch_size,
1184            row_groups,
1185            projection,
1186            mut filter,
1187            selection,
1188            row_selection_policy,
1189            limit,
1190            offset,
1191            metrics,
1192            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1193            max_predicate_cache_size: _,
1194        } = self;
1195
1196        // Try to avoid allocate large buffer
1197        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1198
1199        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1200
1201        let reader = ReaderRowGroups {
1202            reader: Arc::new(input.0),
1203            metadata,
1204            row_groups,
1205        };
1206
1207        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1208            .with_selection(selection)
1209            .with_row_selection_policy(row_selection_policy);
1210
1211        // Update selection based on any filters
1212        if let Some(filter) = filter.as_mut() {
1213            for predicate in filter.predicates.iter_mut() {
1214                // break early if we have ruled out all rows
1215                if !plan_builder.selects_any() {
1216                    break;
1217                }
1218
1219                let mut cache_projection = predicate.projection().clone();
1220                cache_projection.intersect(&projection);
1221
1222                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1223                    .with_batch_size(batch_size)
1224                    .with_parquet_metadata(&reader.metadata)
1225                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1226
1227                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1228            }
1229        }
1230
1231        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1232            .with_batch_size(batch_size)
1233            .with_parquet_metadata(&reader.metadata)
1234            .build_array_reader(fields.as_deref(), &projection)?;
1235
1236        let read_plan = plan_builder
1237            .limited(reader.num_rows())
1238            .with_offset(offset)
1239            .with_limit(limit)
1240            .build_limited()
1241            .build();
1242
1243        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1244    }
1245}
1246
1247struct ReaderRowGroups<T: ChunkReader> {
1248    reader: Arc<T>,
1249
1250    metadata: Arc<ParquetMetaData>,
1251    /// Optional list of row group indices to scan
1252    row_groups: Vec<usize>,
1253}
1254
1255impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1256    fn num_rows(&self) -> usize {
1257        let meta = self.metadata.row_groups();
1258        self.row_groups
1259            .iter()
1260            .map(|x| meta[*x].num_rows() as usize)
1261            .sum()
1262    }
1263
1264    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1265        Ok(Box::new(ReaderPageIterator {
1266            column_idx: i,
1267            reader: self.reader.clone(),
1268            metadata: self.metadata.clone(),
1269            row_groups: self.row_groups.clone().into_iter(),
1270        }))
1271    }
1272
1273    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1274        Box::new(
1275            self.row_groups
1276                .iter()
1277                .map(move |i| self.metadata.row_group(*i)),
1278        )
1279    }
1280
1281    fn metadata(&self) -> &ParquetMetaData {
1282        self.metadata.as_ref()
1283    }
1284}
1285
1286struct ReaderPageIterator<T: ChunkReader> {
1287    reader: Arc<T>,
1288    column_idx: usize,
1289    row_groups: std::vec::IntoIter<usize>,
1290    metadata: Arc<ParquetMetaData>,
1291}
1292
1293impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1294    /// Return the next SerializedPageReader
1295    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1296        let rg = self.metadata.row_group(rg_idx);
1297        let column_chunk_metadata = rg.column(self.column_idx);
1298        let offset_index = self.metadata.offset_index();
1299        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1300        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1301        let page_locations = offset_index
1302            .filter(|i| !i[rg_idx].is_empty())
1303            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1304        let total_rows = rg.num_rows() as usize;
1305        let reader = self.reader.clone();
1306
1307        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1308            .add_crypto_context(
1309                rg_idx,
1310                self.column_idx,
1311                self.metadata.as_ref(),
1312                column_chunk_metadata,
1313            )
1314    }
1315}
1316
1317impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1318    type Item = Result<Box<dyn PageReader>>;
1319
1320    fn next(&mut self) -> Option<Self::Item> {
1321        let rg_idx = self.row_groups.next()?;
1322        let page_reader = self
1323            .next_page_reader(rg_idx)
1324            .map(|page_reader| Box::new(page_reader) as _);
1325        Some(page_reader)
1326    }
1327}
1328
1329impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1330
1331/// Reads Parquet data as Arrow [`RecordBatch`]es
1332///
1333/// This struct implements the [`RecordBatchReader`] trait and is an
1334/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1335///
1336/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1337///
1338/// Created by [`ParquetRecordBatchReaderBuilder`]
1339///
1340/// [`Bytes`]: bytes::Bytes
1341pub struct ParquetRecordBatchReader {
1342    array_reader: Box<dyn ArrayReader>,
1343    schema: SchemaRef,
1344    read_plan: ReadPlan,
1345}
1346
1347impl Debug for ParquetRecordBatchReader {
1348    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1349        f.debug_struct("ParquetRecordBatchReader")
1350            .field("array_reader", &"...")
1351            .field("schema", &self.schema)
1352            .field("read_plan", &self.read_plan)
1353            .finish()
1354    }
1355}
1356
1357impl Iterator for ParquetRecordBatchReader {
1358    type Item = Result<RecordBatch, ArrowError>;
1359
1360    fn next(&mut self) -> Option<Self::Item> {
1361        self.next_inner()
1362            .map_err(|arrow_err| arrow_err.into())
1363            .transpose()
1364    }
1365}
1366
1367impl ParquetRecordBatchReader {
1368    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1369    /// has reached the end of the file.
1370    ///
1371    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1372    /// simplify error handling with `?`
1373    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1374        let mut read_records = 0;
1375        let batch_size = self.batch_size();
1376        if batch_size == 0 {
1377            return Ok(None);
1378        }
1379        match self.read_plan.row_selection_cursor_mut() {
1380            RowSelectionCursor::Mask(mask_cursor) => {
1381                // Stream the record batch reader using contiguous segments of the selection
1382                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1383                while !mask_cursor.is_empty() {
1384                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1385                        return Ok(None);
1386                    };
1387
1388                    if mask_chunk.initial_skip > 0 {
1389                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1390                        if skipped != mask_chunk.initial_skip {
1391                            return Err(general_err!(
1392                                "failed to skip rows, expected {}, got {}",
1393                                mask_chunk.initial_skip,
1394                                skipped
1395                            ));
1396                        }
1397                    }
1398
1399                    if mask_chunk.chunk_rows == 0 {
1400                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1401                            return Ok(None);
1402                        }
1403                        continue;
1404                    }
1405
1406                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1407
1408                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1409                    if read == 0 {
1410                        return Err(general_err!(
1411                            "reached end of column while expecting {} rows",
1412                            mask_chunk.chunk_rows
1413                        ));
1414                    }
1415                    if read != mask_chunk.chunk_rows {
1416                        return Err(general_err!(
1417                            "insufficient rows read from array reader - expected {}, got {}",
1418                            mask_chunk.chunk_rows,
1419                            read
1420                        ));
1421                    }
1422
1423                    let array = self.array_reader.consume_batch()?;
1424                    // The column reader exposes the projection as a struct array; convert this
1425                    // into a record batch before applying the boolean filter mask.
1426                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1427                        ArrowError::ParquetError(
1428                            "Struct array reader should return struct array".to_string(),
1429                        )
1430                    })?;
1431
1432                    let filtered_batch =
1433                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1434
1435                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1436                        return Err(general_err!(
1437                            "filtered rows mismatch selection - expected {}, got {}",
1438                            mask_chunk.selected_rows,
1439                            filtered_batch.num_rows()
1440                        ));
1441                    }
1442
1443                    if filtered_batch.num_rows() == 0 {
1444                        continue;
1445                    }
1446
1447                    return Ok(Some(filtered_batch));
1448                }
1449            }
1450            RowSelectionCursor::Selectors(selectors_cursor) => {
1451                while read_records < batch_size && !selectors_cursor.is_empty() {
1452                    let front = selectors_cursor.next_selector();
1453                    if front.skip {
1454                        let skipped = self.array_reader.skip_records(front.row_count)?;
1455
1456                        if skipped != front.row_count {
1457                            return Err(general_err!(
1458                                "failed to skip rows, expected {}, got {}",
1459                                front.row_count,
1460                                skipped
1461                            ));
1462                        }
1463                        continue;
1464                    }
1465
1466                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1467                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1468                    if front.row_count == 0 {
1469                        continue;
1470                    }
1471
1472                    // try to read record
1473                    let need_read = batch_size - read_records;
1474                    let to_read = match front.row_count.checked_sub(need_read) {
1475                        Some(remaining) if remaining != 0 => {
1476                            // if page row count less than batch_size we must set batch size to page row count.
1477                            // add check avoid dead loop
1478                            selectors_cursor.return_selector(RowSelector::select(remaining));
1479                            need_read
1480                        }
1481                        _ => front.row_count,
1482                    };
1483                    match self.array_reader.read_records(to_read)? {
1484                        0 => break,
1485                        rec => read_records += rec,
1486                    };
1487                }
1488            }
1489            RowSelectionCursor::All => {
1490                self.array_reader.read_records(batch_size)?;
1491            }
1492        };
1493
1494        let array = self.array_reader.consume_batch()?;
1495        let struct_array = array.as_struct_opt().ok_or_else(|| {
1496            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1497        })?;
1498
1499        Ok(if struct_array.len() > 0 {
1500            Some(RecordBatch::from(struct_array))
1501        } else {
1502            None
1503        })
1504    }
1505}
1506
1507impl RecordBatchReader for ParquetRecordBatchReader {
1508    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1509    ///
1510    /// Note that the schema metadata will be stripped here. See
1511    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1512    fn schema(&self) -> SchemaRef {
1513        self.schema.clone()
1514    }
1515}
1516
1517impl ParquetRecordBatchReader {
1518    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1519    ///
1520    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1521    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1522        ParquetRecordBatchReaderBuilder::try_new(reader)?
1523            .with_batch_size(batch_size)
1524            .build()
1525    }
1526
1527    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1528    ///
1529    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1530    /// higher-level interface for reading parquet data from a file
1531    pub fn try_new_with_row_groups(
1532        levels: &FieldLevels,
1533        row_groups: &dyn RowGroups,
1534        batch_size: usize,
1535        selection: Option<RowSelection>,
1536    ) -> Result<Self> {
1537        // note metrics are not supported in this API
1538        let metrics = ArrowReaderMetrics::disabled();
1539        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1540            .with_batch_size(batch_size)
1541            .with_parquet_metadata(row_groups.metadata())
1542            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1543
1544        let read_plan = ReadPlanBuilder::new(batch_size)
1545            .with_selection(selection)
1546            .build();
1547
1548        Ok(Self {
1549            array_reader,
1550            schema: Arc::new(Schema::new(levels.fields.clone())),
1551            read_plan,
1552        })
1553    }
1554
1555    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1556    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1557    /// all rows will be returned
1558    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1559        let schema = match array_reader.get_data_type() {
1560            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1561            _ => unreachable!("Struct array reader's data type is not struct!"),
1562        };
1563
1564        Self {
1565            array_reader,
1566            schema: Arc::new(schema),
1567            read_plan,
1568        }
1569    }
1570
1571    #[inline(always)]
1572    pub(crate) fn batch_size(&self) -> usize {
1573        self.read_plan.batch_size()
1574    }
1575}
1576
1577#[cfg(test)]
1578pub(crate) mod tests {
1579    use std::cmp::min;
1580    use std::collections::{HashMap, VecDeque};
1581    use std::fmt::Formatter;
1582    use std::fs::File;
1583    use std::io::Seek;
1584    use std::path::PathBuf;
1585    use std::sync::Arc;
1586
1587    use rand::rngs::StdRng;
1588    use rand::{Rng, RngCore, SeedableRng, random, rng};
1589    use tempfile::tempfile;
1590
1591    use crate::arrow::arrow_reader::{
1592        ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1593        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1594    };
1595    use crate::arrow::schema::{
1596        add_encoded_arrow_schema_to_metadata,
1597        virtual_type::{RowGroupIndex, RowNumber},
1598    };
1599    use crate::arrow::{ArrowWriter, ProjectionMask};
1600    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1601    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1602    use crate::data_type::{
1603        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1604        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1605    };
1606    use crate::errors::Result;
1607    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1608    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1609    use crate::file::writer::SerializedFileWriter;
1610    use crate::schema::parser::parse_message_type;
1611    use crate::schema::types::{Type, TypePtr};
1612    use crate::util::test_common::rand_gen::RandGen;
1613    use arrow_array::builder::*;
1614    use arrow_array::cast::AsArray;
1615    use arrow_array::types::{
1616        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1617        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1618        Time64MicrosecondType,
1619    };
1620    use arrow_array::*;
1621    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1622    use arrow_data::{ArrayData, ArrayDataBuilder};
1623    use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1624    use arrow_select::concat::concat_batches;
1625    use bytes::Bytes;
1626    use half::f16;
1627    use num_traits::PrimInt;
1628
1629    #[test]
1630    fn test_arrow_reader_all_columns() {
1631        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1632
1633        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1634        let original_schema = Arc::clone(builder.schema());
1635        let reader = builder.build().unwrap();
1636
1637        // Verify that the schema was correctly parsed
1638        assert_eq!(original_schema.fields(), reader.schema().fields());
1639    }
1640
1641    #[test]
1642    fn test_reuse_schema() {
1643        let file = get_test_file("parquet/alltypes-java.parquet");
1644
1645        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1646        let expected = builder.metadata;
1647        let schema = expected.file_metadata().schema_descr_ptr();
1648
1649        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1650        let builder =
1651            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1652
1653        // Verify that the metadata matches
1654        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1655    }
1656
1657    #[test]
1658    fn test_page_encoding_stats_mask() {
1659        let testdata = arrow::util::test_util::parquet_test_data();
1660        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1661        let file = File::open(path).unwrap();
1662
1663        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1664        let builder =
1665            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1666
1667        let row_group_metadata = builder.metadata.row_group(0);
1668
1669        // test page encoding stats
1670        let page_encoding_stats = row_group_metadata
1671            .column(0)
1672            .page_encoding_stats_mask()
1673            .unwrap();
1674        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1675        let page_encoding_stats = row_group_metadata
1676            .column(2)
1677            .page_encoding_stats_mask()
1678            .unwrap();
1679        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1680    }
1681
1682    #[test]
1683    fn test_stats_stats_skipped() {
1684        let testdata = arrow::util::test_util::parquet_test_data();
1685        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1686        let file = File::open(path).unwrap();
1687
1688        // test skipping all
1689        let arrow_options = ArrowReaderOptions::new()
1690            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1691            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1692        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1693            file.try_clone().unwrap(),
1694            arrow_options,
1695        )
1696        .unwrap();
1697
1698        let row_group_metadata = builder.metadata.row_group(0);
1699        for column in row_group_metadata.columns() {
1700            assert!(column.page_encoding_stats().is_none());
1701            assert!(column.page_encoding_stats_mask().is_none());
1702            assert!(column.statistics().is_none());
1703        }
1704
1705        // test skipping all but one column and converting to mask
1706        let arrow_options = ArrowReaderOptions::new()
1707            .with_encoding_stats_as_mask(true)
1708            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1709            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1710        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1711            file.try_clone().unwrap(),
1712            arrow_options,
1713        )
1714        .unwrap();
1715
1716        let row_group_metadata = builder.metadata.row_group(0);
1717        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1718            assert!(column.page_encoding_stats().is_none());
1719            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1720            assert_eq!(column.statistics().is_some(), idx == 0);
1721        }
1722    }
1723
1724    #[test]
1725    fn test_size_stats_stats_skipped() {
1726        let testdata = arrow::util::test_util::parquet_test_data();
1727        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1728        let file = File::open(path).unwrap();
1729
1730        // test skipping all
1731        let arrow_options =
1732            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1733        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1734            file.try_clone().unwrap(),
1735            arrow_options,
1736        )
1737        .unwrap();
1738
1739        let row_group_metadata = builder.metadata.row_group(0);
1740        for column in row_group_metadata.columns() {
1741            assert!(column.repetition_level_histogram().is_none());
1742            assert!(column.definition_level_histogram().is_none());
1743            assert!(column.unencoded_byte_array_data_bytes().is_none());
1744        }
1745
1746        // test skipping all but one column and converting to mask
1747        let arrow_options = ArrowReaderOptions::new()
1748            .with_encoding_stats_as_mask(true)
1749            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1750        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1751            file.try_clone().unwrap(),
1752            arrow_options,
1753        )
1754        .unwrap();
1755
1756        let row_group_metadata = builder.metadata.row_group(0);
1757        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1758            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1759            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1760            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1761        }
1762    }
1763
1764    #[test]
1765    fn test_arrow_reader_single_column() {
1766        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1767
1768        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1769        let original_schema = Arc::clone(builder.schema());
1770
1771        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1772        let reader = builder.with_projection(mask).build().unwrap();
1773
1774        // Verify that the schema was correctly parsed
1775        assert_eq!(1, reader.schema().fields().len());
1776        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1777    }
1778
1779    #[test]
1780    fn test_arrow_reader_single_column_by_name() {
1781        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1782
1783        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1784        let original_schema = Arc::clone(builder.schema());
1785
1786        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1787        let reader = builder.with_projection(mask).build().unwrap();
1788
1789        // Verify that the schema was correctly parsed
1790        assert_eq!(1, reader.schema().fields().len());
1791        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1792    }
1793
1794    #[test]
1795    fn test_null_column_reader_test() {
1796        let mut file = tempfile::tempfile().unwrap();
1797
1798        let schema = "
1799            message message {
1800                OPTIONAL INT32 int32;
1801            }
1802        ";
1803        let schema = Arc::new(parse_message_type(schema).unwrap());
1804
1805        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1806        generate_single_column_file_with_data::<Int32Type>(
1807            &[vec![], vec![]],
1808            Some(&def_levels),
1809            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1810            schema,
1811            Some(Field::new("int32", ArrowDataType::Null, true)),
1812            &Default::default(),
1813        )
1814        .unwrap();
1815
1816        file.rewind().unwrap();
1817
1818        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1819        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1820
1821        assert_eq!(batches.len(), 4);
1822        for batch in &batches[0..3] {
1823            assert_eq!(batch.num_rows(), 2);
1824            assert_eq!(batch.num_columns(), 1);
1825            assert_eq!(batch.column(0).null_count(), 2);
1826        }
1827
1828        assert_eq!(batches[3].num_rows(), 1);
1829        assert_eq!(batches[3].num_columns(), 1);
1830        assert_eq!(batches[3].column(0).null_count(), 1);
1831    }
1832
1833    #[test]
1834    fn test_primitive_single_column_reader_test() {
1835        run_single_column_reader_tests::<BoolType, _, BoolType>(
1836            2,
1837            ConvertedType::NONE,
1838            None,
1839            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1840            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841        );
1842        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1843            2,
1844            ConvertedType::NONE,
1845            None,
1846            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1847            &[
1848                Encoding::PLAIN,
1849                Encoding::RLE_DICTIONARY,
1850                Encoding::DELTA_BINARY_PACKED,
1851                Encoding::BYTE_STREAM_SPLIT,
1852            ],
1853        );
1854        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1855            2,
1856            ConvertedType::NONE,
1857            None,
1858            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1859            &[
1860                Encoding::PLAIN,
1861                Encoding::RLE_DICTIONARY,
1862                Encoding::DELTA_BINARY_PACKED,
1863                Encoding::BYTE_STREAM_SPLIT,
1864            ],
1865        );
1866        run_single_column_reader_tests::<FloatType, _, FloatType>(
1867            2,
1868            ConvertedType::NONE,
1869            None,
1870            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1871            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1872        );
1873    }
1874
1875    #[test]
1876    fn test_unsigned_primitive_single_column_reader_test() {
1877        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1878            2,
1879            ConvertedType::UINT_32,
1880            Some(ArrowDataType::UInt32),
1881            |vals| {
1882                Arc::new(UInt32Array::from_iter(
1883                    vals.iter().map(|x| x.map(|x| x as u32)),
1884                ))
1885            },
1886            &[
1887                Encoding::PLAIN,
1888                Encoding::RLE_DICTIONARY,
1889                Encoding::DELTA_BINARY_PACKED,
1890            ],
1891        );
1892        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1893            2,
1894            ConvertedType::UINT_64,
1895            Some(ArrowDataType::UInt64),
1896            |vals| {
1897                Arc::new(UInt64Array::from_iter(
1898                    vals.iter().map(|x| x.map(|x| x as u64)),
1899                ))
1900            },
1901            &[
1902                Encoding::PLAIN,
1903                Encoding::RLE_DICTIONARY,
1904                Encoding::DELTA_BINARY_PACKED,
1905            ],
1906        );
1907    }
1908
1909    #[test]
1910    fn test_unsigned_roundtrip() {
1911        let schema = Arc::new(Schema::new(vec![
1912            Field::new("uint32", ArrowDataType::UInt32, true),
1913            Field::new("uint64", ArrowDataType::UInt64, true),
1914        ]));
1915
1916        let mut buf = Vec::with_capacity(1024);
1917        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1918
1919        let original = RecordBatch::try_new(
1920            schema,
1921            vec![
1922                Arc::new(UInt32Array::from_iter_values([
1923                    0,
1924                    i32::MAX as u32,
1925                    u32::MAX,
1926                ])),
1927                Arc::new(UInt64Array::from_iter_values([
1928                    0,
1929                    i64::MAX as u64,
1930                    u64::MAX,
1931                ])),
1932            ],
1933        )
1934        .unwrap();
1935
1936        writer.write(&original).unwrap();
1937        writer.close().unwrap();
1938
1939        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1940        let ret = reader.next().unwrap().unwrap();
1941        assert_eq!(ret, original);
1942
1943        // Check they can be downcast to the correct type
1944        ret.column(0)
1945            .as_any()
1946            .downcast_ref::<UInt32Array>()
1947            .unwrap();
1948
1949        ret.column(1)
1950            .as_any()
1951            .downcast_ref::<UInt64Array>()
1952            .unwrap();
1953    }
1954
1955    #[test]
1956    fn test_float16_roundtrip() -> Result<()> {
1957        let schema = Arc::new(Schema::new(vec![
1958            Field::new("float16", ArrowDataType::Float16, false),
1959            Field::new("float16-nullable", ArrowDataType::Float16, true),
1960        ]));
1961
1962        let mut buf = Vec::with_capacity(1024);
1963        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1964
1965        let original = RecordBatch::try_new(
1966            schema,
1967            vec![
1968                Arc::new(Float16Array::from_iter_values([
1969                    f16::EPSILON,
1970                    f16::MIN,
1971                    f16::MAX,
1972                    f16::NAN,
1973                    f16::INFINITY,
1974                    f16::NEG_INFINITY,
1975                    f16::ONE,
1976                    f16::NEG_ONE,
1977                    f16::ZERO,
1978                    f16::NEG_ZERO,
1979                    f16::E,
1980                    f16::PI,
1981                    f16::FRAC_1_PI,
1982                ])),
1983                Arc::new(Float16Array::from(vec![
1984                    None,
1985                    None,
1986                    None,
1987                    Some(f16::NAN),
1988                    Some(f16::INFINITY),
1989                    Some(f16::NEG_INFINITY),
1990                    None,
1991                    None,
1992                    None,
1993                    None,
1994                    None,
1995                    None,
1996                    Some(f16::FRAC_1_PI),
1997                ])),
1998            ],
1999        )?;
2000
2001        writer.write(&original)?;
2002        writer.close()?;
2003
2004        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2005        let ret = reader.next().unwrap()?;
2006        assert_eq!(ret, original);
2007
2008        // Ensure can be downcast to the correct type
2009        ret.column(0).as_primitive::<Float16Type>();
2010        ret.column(1).as_primitive::<Float16Type>();
2011
2012        Ok(())
2013    }
2014
2015    #[test]
2016    fn test_time_utc_roundtrip() -> Result<()> {
2017        let schema = Arc::new(Schema::new(vec![
2018            Field::new(
2019                "time_millis",
2020                ArrowDataType::Time32(TimeUnit::Millisecond),
2021                true,
2022            )
2023            .with_metadata(HashMap::from_iter(vec![(
2024                "adjusted_to_utc".to_string(),
2025                "".to_string(),
2026            )])),
2027            Field::new(
2028                "time_micros",
2029                ArrowDataType::Time64(TimeUnit::Microsecond),
2030                true,
2031            )
2032            .with_metadata(HashMap::from_iter(vec![(
2033                "adjusted_to_utc".to_string(),
2034                "".to_string(),
2035            )])),
2036        ]));
2037
2038        let mut buf = Vec::with_capacity(1024);
2039        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2040
2041        let original = RecordBatch::try_new(
2042            schema,
2043            vec![
2044                Arc::new(Time32MillisecondArray::from(vec![
2045                    Some(-1),
2046                    Some(0),
2047                    Some(86_399_000),
2048                    Some(86_400_000),
2049                    Some(86_401_000),
2050                    None,
2051                ])),
2052                Arc::new(Time64MicrosecondArray::from(vec![
2053                    Some(-1),
2054                    Some(0),
2055                    Some(86_399 * 1_000_000),
2056                    Some(86_400 * 1_000_000),
2057                    Some(86_401 * 1_000_000),
2058                    None,
2059                ])),
2060            ],
2061        )?;
2062
2063        writer.write(&original)?;
2064        writer.close()?;
2065
2066        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2067        let ret = reader.next().unwrap()?;
2068        assert_eq!(ret, original);
2069
2070        // Ensure can be downcast to the correct type
2071        ret.column(0).as_primitive::<Time32MillisecondType>();
2072        ret.column(1).as_primitive::<Time64MicrosecondType>();
2073
2074        Ok(())
2075    }
2076
2077    #[test]
2078    fn test_date32_roundtrip() -> Result<()> {
2079        use arrow_array::Date32Array;
2080
2081        let schema = Arc::new(Schema::new(vec![Field::new(
2082            "date32",
2083            ArrowDataType::Date32,
2084            false,
2085        )]));
2086
2087        let mut buf = Vec::with_capacity(1024);
2088
2089        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2090
2091        let original = RecordBatch::try_new(
2092            schema,
2093            vec![Arc::new(Date32Array::from(vec![
2094                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2095            ]))],
2096        )?;
2097
2098        writer.write(&original)?;
2099        writer.close()?;
2100
2101        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2102        let ret = reader.next().unwrap()?;
2103        assert_eq!(ret, original);
2104
2105        // Ensure can be downcast to the correct type
2106        ret.column(0).as_primitive::<Date32Type>();
2107
2108        Ok(())
2109    }
2110
2111    #[test]
2112    fn test_date64_roundtrip() -> Result<()> {
2113        use arrow_array::Date64Array;
2114
2115        let schema = Arc::new(Schema::new(vec![
2116            Field::new("small-date64", ArrowDataType::Date64, false),
2117            Field::new("big-date64", ArrowDataType::Date64, false),
2118            Field::new("invalid-date64", ArrowDataType::Date64, false),
2119        ]));
2120
2121        let mut default_buf = Vec::with_capacity(1024);
2122        let mut coerce_buf = Vec::with_capacity(1024);
2123
2124        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2125
2126        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2127        let mut coerce_writer =
2128            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2129
2130        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2131
2132        let original = RecordBatch::try_new(
2133            schema,
2134            vec![
2135                // small-date64
2136                Arc::new(Date64Array::from(vec![
2137                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2138                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2139                    0,
2140                    1_000 * NUM_MILLISECONDS_IN_DAY,
2141                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2142                ])),
2143                // big-date64
2144                Arc::new(Date64Array::from(vec![
2145                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2147                    0,
2148                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2149                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2150                ])),
2151                // invalid-date64
2152                Arc::new(Date64Array::from(vec![
2153                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2155                    1,
2156                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2157                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2158                ])),
2159            ],
2160        )?;
2161
2162        default_writer.write(&original)?;
2163        coerce_writer.write(&original)?;
2164
2165        default_writer.close()?;
2166        coerce_writer.close()?;
2167
2168        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2169        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2170
2171        let default_ret = default_reader.next().unwrap()?;
2172        let coerce_ret = coerce_reader.next().unwrap()?;
2173
2174        // Roundtrip should be successful when default writer used
2175        assert_eq!(default_ret, original);
2176
2177        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2178        assert_eq!(coerce_ret.column(0), original.column(0));
2179        assert_ne!(coerce_ret.column(1), original.column(1));
2180        assert_ne!(coerce_ret.column(2), original.column(2));
2181
2182        // Ensure both can be downcast to the correct type
2183        default_ret.column(0).as_primitive::<Date64Type>();
2184        coerce_ret.column(0).as_primitive::<Date64Type>();
2185
2186        Ok(())
2187    }
2188    struct RandFixedLenGen {}
2189
2190    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2191        fn r#gen(len: i32) -> FixedLenByteArray {
2192            let mut v = vec![0u8; len as usize];
2193            rng().fill_bytes(&mut v);
2194            ByteArray::from(v).into()
2195        }
2196    }
2197
2198    #[test]
2199    fn test_fixed_length_binary_column_reader() {
2200        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2201            20,
2202            ConvertedType::NONE,
2203            None,
2204            |vals| {
2205                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2206                for val in vals {
2207                    match val {
2208                        Some(b) => builder.append_value(b).unwrap(),
2209                        None => builder.append_null(),
2210                    }
2211                }
2212                Arc::new(builder.finish())
2213            },
2214            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2215        );
2216    }
2217
2218    #[test]
2219    fn test_interval_day_time_column_reader() {
2220        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2221            12,
2222            ConvertedType::INTERVAL,
2223            None,
2224            |vals| {
2225                Arc::new(
2226                    vals.iter()
2227                        .map(|x| {
2228                            x.as_ref().map(|b| IntervalDayTime {
2229                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2230                                milliseconds: i32::from_le_bytes(
2231                                    b.as_ref()[8..12].try_into().unwrap(),
2232                                ),
2233                            })
2234                        })
2235                        .collect::<IntervalDayTimeArray>(),
2236                )
2237            },
2238            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2239        );
2240    }
2241
2242    #[test]
2243    fn test_int96_single_column_reader_test() {
2244        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2245
2246        type TypeHintAndConversionFunction =
2247            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2248
2249        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2250            // Test without a specified ArrowType hint.
2251            (None, |vals: &[Option<Int96>]| {
2252                Arc::new(TimestampNanosecondArray::from_iter(
2253                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2254                )) as ArrayRef
2255            }),
2256            // Test other TimeUnits as ArrowType hints.
2257            (
2258                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2259                |vals: &[Option<Int96>]| {
2260                    Arc::new(TimestampSecondArray::from_iter(
2261                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2262                    )) as ArrayRef
2263                },
2264            ),
2265            (
2266                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2267                |vals: &[Option<Int96>]| {
2268                    Arc::new(TimestampMillisecondArray::from_iter(
2269                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2270                    )) as ArrayRef
2271                },
2272            ),
2273            (
2274                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2275                |vals: &[Option<Int96>]| {
2276                    Arc::new(TimestampMicrosecondArray::from_iter(
2277                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2278                    )) as ArrayRef
2279                },
2280            ),
2281            (
2282                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2283                |vals: &[Option<Int96>]| {
2284                    Arc::new(TimestampNanosecondArray::from_iter(
2285                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2286                    )) as ArrayRef
2287                },
2288            ),
2289            // Test another timezone with TimeUnit as ArrowType hints.
2290            (
2291                Some(ArrowDataType::Timestamp(
2292                    TimeUnit::Second,
2293                    Some(Arc::from("-05:00")),
2294                )),
2295                |vals: &[Option<Int96>]| {
2296                    Arc::new(
2297                        TimestampSecondArray::from_iter(
2298                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2299                        )
2300                        .with_timezone("-05:00"),
2301                    ) as ArrayRef
2302                },
2303            ),
2304        ];
2305
2306        resolutions.iter().for_each(|(arrow_type, converter)| {
2307            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2308                2,
2309                ConvertedType::NONE,
2310                arrow_type.clone(),
2311                converter,
2312                encodings,
2313            );
2314        })
2315    }
2316
2317    #[test]
2318    fn test_int96_from_spark_file_with_provided_schema() {
2319        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2320        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2321        // microsecond resolution for the Parquet reader to interpret these values correctly.
2322        use arrow_schema::DataType::Timestamp;
2323        let test_data = arrow::util::test_util::parquet_test_data();
2324        let path = format!("{test_data}/int96_from_spark.parquet");
2325        let file = File::open(path).unwrap();
2326
2327        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2328            "a",
2329            Timestamp(TimeUnit::Microsecond, None),
2330            true,
2331        )]));
2332        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2333
2334        let mut record_reader =
2335            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2336                .unwrap()
2337                .build()
2338                .unwrap();
2339
2340        let batch = record_reader.next().unwrap().unwrap();
2341        assert_eq!(batch.num_columns(), 1);
2342        let column = batch.column(0);
2343        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2344
2345        let expected = Arc::new(Int64Array::from(vec![
2346            Some(1704141296123456),
2347            Some(1704070800000000),
2348            Some(253402225200000000),
2349            Some(1735599600000000),
2350            None,
2351            Some(9089380393200000000),
2352        ]));
2353
2354        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2355        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2356        // anyway, so this should be a zero-copy non-modifying cast.
2357
2358        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2359        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2360
2361        assert_eq!(casted_timestamps.len(), expected.len());
2362
2363        casted_timestamps
2364            .iter()
2365            .zip(expected.iter())
2366            .for_each(|(lhs, rhs)| {
2367                assert_eq!(lhs, rhs);
2368            });
2369    }
2370
2371    #[test]
2372    fn test_int96_from_spark_file_without_provided_schema() {
2373        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2374        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2375        // values when read as nanosecond resolution overflow and result in garbage values.
2376        use arrow_schema::DataType::Timestamp;
2377        let test_data = arrow::util::test_util::parquet_test_data();
2378        let path = format!("{test_data}/int96_from_spark.parquet");
2379        let file = File::open(path).unwrap();
2380
2381        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2382            .unwrap()
2383            .build()
2384            .unwrap();
2385
2386        let batch = record_reader.next().unwrap().unwrap();
2387        assert_eq!(batch.num_columns(), 1);
2388        let column = batch.column(0);
2389        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2390
2391        let expected = Arc::new(Int64Array::from(vec![
2392            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2393            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2394            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2395            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2396            None,
2397            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2398        ]));
2399
2400        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2401        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2402        // anyway, so this should be a zero-copy non-modifying cast.
2403
2404        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2405        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2406
2407        assert_eq!(casted_timestamps.len(), expected.len());
2408
2409        casted_timestamps
2410            .iter()
2411            .zip(expected.iter())
2412            .for_each(|(lhs, rhs)| {
2413                assert_eq!(lhs, rhs);
2414            });
2415    }
2416
2417    struct RandUtf8Gen {}
2418
2419    impl RandGen<ByteArrayType> for RandUtf8Gen {
2420        fn r#gen(len: i32) -> ByteArray {
2421            Int32Type::r#gen(len).to_string().as_str().into()
2422        }
2423    }
2424
2425    #[test]
2426    fn test_utf8_single_column_reader_test() {
2427        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2428            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2429                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2430            })))
2431        }
2432
2433        let encodings = &[
2434            Encoding::PLAIN,
2435            Encoding::RLE_DICTIONARY,
2436            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2437            Encoding::DELTA_BYTE_ARRAY,
2438        ];
2439
2440        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2441            2,
2442            ConvertedType::NONE,
2443            None,
2444            |vals| {
2445                Arc::new(BinaryArray::from_iter(
2446                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2447                ))
2448            },
2449            encodings,
2450        );
2451
2452        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2453            2,
2454            ConvertedType::UTF8,
2455            None,
2456            string_converter::<i32>,
2457            encodings,
2458        );
2459
2460        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2461            2,
2462            ConvertedType::UTF8,
2463            Some(ArrowDataType::Utf8),
2464            string_converter::<i32>,
2465            encodings,
2466        );
2467
2468        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2469            2,
2470            ConvertedType::UTF8,
2471            Some(ArrowDataType::LargeUtf8),
2472            string_converter::<i64>,
2473            encodings,
2474        );
2475
2476        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2477        for key in &small_key_types {
2478            for encoding in encodings {
2479                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2480                opts.encoding = *encoding;
2481
2482                let data_type =
2483                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2484
2485                // Cannot run full test suite as keys overflow, run small test instead
2486                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2487                    opts,
2488                    2,
2489                    ConvertedType::UTF8,
2490                    Some(data_type.clone()),
2491                    move |vals| {
2492                        let vals = string_converter::<i32>(vals);
2493                        arrow::compute::cast(&vals, &data_type).unwrap()
2494                    },
2495                );
2496            }
2497        }
2498
2499        let key_types = [
2500            ArrowDataType::Int16,
2501            ArrowDataType::UInt16,
2502            ArrowDataType::Int32,
2503            ArrowDataType::UInt32,
2504            ArrowDataType::Int64,
2505            ArrowDataType::UInt64,
2506        ];
2507
2508        for key in &key_types {
2509            let data_type =
2510                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2511
2512            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2513                2,
2514                ConvertedType::UTF8,
2515                Some(data_type.clone()),
2516                move |vals| {
2517                    let vals = string_converter::<i32>(vals);
2518                    arrow::compute::cast(&vals, &data_type).unwrap()
2519                },
2520                encodings,
2521            );
2522
2523            let data_type = ArrowDataType::Dictionary(
2524                Box::new(key.clone()),
2525                Box::new(ArrowDataType::LargeUtf8),
2526            );
2527
2528            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2529                2,
2530                ConvertedType::UTF8,
2531                Some(data_type.clone()),
2532                move |vals| {
2533                    let vals = string_converter::<i64>(vals);
2534                    arrow::compute::cast(&vals, &data_type).unwrap()
2535                },
2536                encodings,
2537            );
2538        }
2539    }
2540
2541    #[test]
2542    fn test_decimal_nullable_struct() {
2543        let decimals = Decimal256Array::from_iter_values(
2544            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2545        );
2546
2547        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2548            "decimals",
2549            decimals.data_type().clone(),
2550            false,
2551        )])))
2552        .len(8)
2553        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2554        .child_data(vec![decimals.into_data()])
2555        .build()
2556        .unwrap();
2557
2558        let written =
2559            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2560                .unwrap();
2561
2562        let mut buffer = Vec::with_capacity(1024);
2563        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2564        writer.write(&written).unwrap();
2565        writer.close().unwrap();
2566
2567        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2568            .unwrap()
2569            .collect::<Result<Vec<_>, _>>()
2570            .unwrap();
2571
2572        assert_eq!(&written.slice(0, 3), &read[0]);
2573        assert_eq!(&written.slice(3, 3), &read[1]);
2574        assert_eq!(&written.slice(6, 2), &read[2]);
2575    }
2576
2577    #[test]
2578    fn test_int32_nullable_struct() {
2579        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2580        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2581            "int32",
2582            int32.data_type().clone(),
2583            false,
2584        )])))
2585        .len(8)
2586        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2587        .child_data(vec![int32.into_data()])
2588        .build()
2589        .unwrap();
2590
2591        let written =
2592            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2593                .unwrap();
2594
2595        let mut buffer = Vec::with_capacity(1024);
2596        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2597        writer.write(&written).unwrap();
2598        writer.close().unwrap();
2599
2600        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2601            .unwrap()
2602            .collect::<Result<Vec<_>, _>>()
2603            .unwrap();
2604
2605        assert_eq!(&written.slice(0, 3), &read[0]);
2606        assert_eq!(&written.slice(3, 3), &read[1]);
2607        assert_eq!(&written.slice(6, 2), &read[2]);
2608    }
2609
2610    #[test]
2611    fn test_decimal_list() {
2612        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2613
2614        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2615        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2616            decimals.data_type().clone(),
2617            false,
2618        ))))
2619        .len(7)
2620        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2621        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2622        .child_data(vec![decimals.into_data()])
2623        .build()
2624        .unwrap();
2625
2626        let written =
2627            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2628                .unwrap();
2629
2630        let mut buffer = Vec::with_capacity(1024);
2631        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2632        writer.write(&written).unwrap();
2633        writer.close().unwrap();
2634
2635        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2636            .unwrap()
2637            .collect::<Result<Vec<_>, _>>()
2638            .unwrap();
2639
2640        assert_eq!(&written.slice(0, 3), &read[0]);
2641        assert_eq!(&written.slice(3, 3), &read[1]);
2642        assert_eq!(&written.slice(6, 1), &read[2]);
2643    }
2644
2645    #[test]
2646    fn test_read_decimal_file() {
2647        use arrow_array::Decimal128Array;
2648        let testdata = arrow::util::test_util::parquet_test_data();
2649        let file_variants = vec![
2650            ("byte_array", 4),
2651            ("fixed_length", 25),
2652            ("int32", 4),
2653            ("int64", 10),
2654        ];
2655        for (prefix, target_precision) in file_variants {
2656            let path = format!("{testdata}/{prefix}_decimal.parquet");
2657            let file = File::open(path).unwrap();
2658            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2659
2660            let batch = record_reader.next().unwrap().unwrap();
2661            assert_eq!(batch.num_rows(), 24);
2662            let col = batch
2663                .column(0)
2664                .as_any()
2665                .downcast_ref::<Decimal128Array>()
2666                .unwrap();
2667
2668            let expected = 1..25;
2669
2670            assert_eq!(col.precision(), target_precision);
2671            assert_eq!(col.scale(), 2);
2672
2673            for (i, v) in expected.enumerate() {
2674                assert_eq!(col.value(i), v * 100_i128);
2675            }
2676        }
2677    }
2678
2679    #[test]
2680    fn test_read_float16_nonzeros_file() {
2681        use arrow_array::Float16Array;
2682        let testdata = arrow::util::test_util::parquet_test_data();
2683        // see https://github.com/apache/parquet-testing/pull/40
2684        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2685        let file = File::open(path).unwrap();
2686        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2687
2688        let batch = record_reader.next().unwrap().unwrap();
2689        assert_eq!(batch.num_rows(), 8);
2690        let col = batch
2691            .column(0)
2692            .as_any()
2693            .downcast_ref::<Float16Array>()
2694            .unwrap();
2695
2696        let f16_two = f16::ONE + f16::ONE;
2697
2698        assert_eq!(col.null_count(), 1);
2699        assert!(col.is_null(0));
2700        assert_eq!(col.value(1), f16::ONE);
2701        assert_eq!(col.value(2), -f16_two);
2702        assert!(col.value(3).is_nan());
2703        assert_eq!(col.value(4), f16::ZERO);
2704        assert!(col.value(4).is_sign_positive());
2705        assert_eq!(col.value(5), f16::NEG_ONE);
2706        assert_eq!(col.value(6), f16::NEG_ZERO);
2707        assert!(col.value(6).is_sign_negative());
2708        assert_eq!(col.value(7), f16_two);
2709    }
2710
2711    #[test]
2712    fn test_read_float16_zeros_file() {
2713        use arrow_array::Float16Array;
2714        let testdata = arrow::util::test_util::parquet_test_data();
2715        // see https://github.com/apache/parquet-testing/pull/40
2716        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2717        let file = File::open(path).unwrap();
2718        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2719
2720        let batch = record_reader.next().unwrap().unwrap();
2721        assert_eq!(batch.num_rows(), 3);
2722        let col = batch
2723            .column(0)
2724            .as_any()
2725            .downcast_ref::<Float16Array>()
2726            .unwrap();
2727
2728        assert_eq!(col.null_count(), 1);
2729        assert!(col.is_null(0));
2730        assert_eq!(col.value(1), f16::ZERO);
2731        assert!(col.value(1).is_sign_positive());
2732        assert!(col.value(2).is_nan());
2733    }
2734
2735    #[test]
2736    fn test_read_float32_float64_byte_stream_split() {
2737        let path = format!(
2738            "{}/byte_stream_split.zstd.parquet",
2739            arrow::util::test_util::parquet_test_data(),
2740        );
2741        let file = File::open(path).unwrap();
2742        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2743
2744        let mut row_count = 0;
2745        for batch in record_reader {
2746            let batch = batch.unwrap();
2747            row_count += batch.num_rows();
2748            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2749            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2750
2751            // This file contains floats from a standard normal distribution
2752            for &x in f32_col.values() {
2753                assert!(x > -10.0);
2754                assert!(x < 10.0);
2755            }
2756            for &x in f64_col.values() {
2757                assert!(x > -10.0);
2758                assert!(x < 10.0);
2759            }
2760        }
2761        assert_eq!(row_count, 300);
2762    }
2763
2764    #[test]
2765    fn test_read_extended_byte_stream_split() {
2766        let path = format!(
2767            "{}/byte_stream_split_extended.gzip.parquet",
2768            arrow::util::test_util::parquet_test_data(),
2769        );
2770        let file = File::open(path).unwrap();
2771        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2772
2773        let mut row_count = 0;
2774        for batch in record_reader {
2775            let batch = batch.unwrap();
2776            row_count += batch.num_rows();
2777
2778            // 0,1 are f16
2779            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2780            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2781            assert_eq!(f16_col.len(), f16_bss.len());
2782            f16_col
2783                .iter()
2784                .zip(f16_bss.iter())
2785                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2786
2787            // 2,3 are f32
2788            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2789            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2790            assert_eq!(f32_col.len(), f32_bss.len());
2791            f32_col
2792                .iter()
2793                .zip(f32_bss.iter())
2794                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2795
2796            // 4,5 are f64
2797            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2798            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2799            assert_eq!(f64_col.len(), f64_bss.len());
2800            f64_col
2801                .iter()
2802                .zip(f64_bss.iter())
2803                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2804
2805            // 6,7 are i32
2806            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2807            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2808            assert_eq!(i32_col.len(), i32_bss.len());
2809            i32_col
2810                .iter()
2811                .zip(i32_bss.iter())
2812                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2813
2814            // 8,9 are i64
2815            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2816            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2817            assert_eq!(i64_col.len(), i64_bss.len());
2818            i64_col
2819                .iter()
2820                .zip(i64_bss.iter())
2821                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2822
2823            // 10,11 are FLBA(5)
2824            let flba_col = batch.column(10).as_fixed_size_binary();
2825            let flba_bss = batch.column(11).as_fixed_size_binary();
2826            assert_eq!(flba_col.len(), flba_bss.len());
2827            flba_col
2828                .iter()
2829                .zip(flba_bss.iter())
2830                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2831
2832            // 12,13 are FLBA(4) (decimal(7,3))
2833            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2834            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2835            assert_eq!(dec_col.len(), dec_bss.len());
2836            dec_col
2837                .iter()
2838                .zip(dec_bss.iter())
2839                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2840        }
2841        assert_eq!(row_count, 200);
2842    }
2843
2844    #[test]
2845    fn test_read_incorrect_map_schema_file() {
2846        let testdata = arrow::util::test_util::parquet_test_data();
2847        // see https://github.com/apache/parquet-testing/pull/47
2848        let path = format!("{testdata}/incorrect_map_schema.parquet");
2849        let file = File::open(path).unwrap();
2850        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2851
2852        let batch = record_reader.next().unwrap().unwrap();
2853        assert_eq!(batch.num_rows(), 1);
2854
2855        let expected_schema = Schema::new(vec![Field::new(
2856            "my_map",
2857            ArrowDataType::Map(
2858                Arc::new(Field::new(
2859                    "key_value",
2860                    ArrowDataType::Struct(Fields::from(vec![
2861                        Field::new("key", ArrowDataType::Utf8, false),
2862                        Field::new("value", ArrowDataType::Utf8, true),
2863                    ])),
2864                    false,
2865                )),
2866                false,
2867            ),
2868            true,
2869        )]);
2870        assert_eq!(batch.schema().as_ref(), &expected_schema);
2871
2872        assert_eq!(batch.num_rows(), 1);
2873        assert_eq!(batch.column(0).null_count(), 0);
2874        assert_eq!(
2875            batch.column(0).as_map().keys().as_ref(),
2876            &StringArray::from(vec!["parent", "name"])
2877        );
2878        assert_eq!(
2879            batch.column(0).as_map().values().as_ref(),
2880            &StringArray::from(vec!["another", "report"])
2881        );
2882    }
2883
2884    #[test]
2885    fn test_read_dict_fixed_size_binary() {
2886        let schema = Arc::new(Schema::new(vec![Field::new(
2887            "a",
2888            ArrowDataType::Dictionary(
2889                Box::new(ArrowDataType::UInt8),
2890                Box::new(ArrowDataType::FixedSizeBinary(8)),
2891            ),
2892            true,
2893        )]));
2894        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2895        let values = FixedSizeBinaryArray::try_from_iter(
2896            vec![
2897                (0u8..8u8).collect::<Vec<u8>>(),
2898                (24u8..32u8).collect::<Vec<u8>>(),
2899            ]
2900            .into_iter(),
2901        )
2902        .unwrap();
2903        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2904        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2905
2906        let mut buffer = Vec::with_capacity(1024);
2907        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2908        writer.write(&batch).unwrap();
2909        writer.close().unwrap();
2910        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2911            .unwrap()
2912            .collect::<Result<Vec<_>, _>>()
2913            .unwrap();
2914
2915        assert_eq!(read.len(), 1);
2916        assert_eq!(&batch, &read[0])
2917    }
2918
2919    #[test]
2920    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2921        // the `StructArrayReader` will check the definition and repetition levels of the first
2922        // child column in the struct to determine nullability for the struct. If the first
2923        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2924        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2925        // buffers managed by this array reader.
2926
2927        let struct_fields = Fields::from(vec![
2928            Field::new(
2929                "city",
2930                ArrowDataType::Dictionary(
2931                    Box::new(ArrowDataType::UInt8),
2932                    Box::new(ArrowDataType::Utf8),
2933                ),
2934                true,
2935            ),
2936            Field::new("name", ArrowDataType::Utf8, true),
2937        ]);
2938        let schema = Arc::new(Schema::new(vec![Field::new(
2939            "items",
2940            ArrowDataType::Struct(struct_fields.clone()),
2941            true,
2942        )]));
2943
2944        let items_arr = StructArray::new(
2945            struct_fields,
2946            vec![
2947                Arc::new(DictionaryArray::new(
2948                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2949                    Arc::new(StringArray::from_iter_values(vec![
2950                        "quebec",
2951                        "fredericton",
2952                        "halifax",
2953                    ])),
2954                )),
2955                Arc::new(StringArray::from_iter_values(vec![
2956                    "albert", "terry", "lance", "", "tim",
2957                ])),
2958            ],
2959            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2960        );
2961
2962        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2963        let mut buffer = Vec::with_capacity(1024);
2964        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2965        writer.write(&batch).unwrap();
2966        writer.close().unwrap();
2967        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2968            .unwrap()
2969            .collect::<Result<Vec<_>, _>>()
2970            .unwrap();
2971
2972        assert_eq!(read.len(), 1);
2973        assert_eq!(&batch, &read[0])
2974    }
2975
2976    /// Parameters for single_column_reader_test
2977    #[derive(Clone)]
2978    struct TestOptions {
2979        /// Number of row group to write to parquet (row group size =
2980        /// num_row_groups / num_rows)
2981        num_row_groups: usize,
2982        /// Total number of rows per row group
2983        num_rows: usize,
2984        /// Size of batches to read back
2985        record_batch_size: usize,
2986        /// Percentage of nulls in column or None if required
2987        null_percent: Option<usize>,
2988        /// Set write batch size
2989        ///
2990        /// This is the number of rows that are written at once to a page and
2991        /// therefore acts as a bound on the page granularity of a row group
2992        write_batch_size: usize,
2993        /// Maximum size of page in bytes
2994        max_data_page_size: usize,
2995        /// Maximum size of dictionary page in bytes
2996        max_dict_page_size: usize,
2997        /// Writer version
2998        writer_version: WriterVersion,
2999        /// Enabled statistics
3000        enabled_statistics: EnabledStatistics,
3001        /// Encoding
3002        encoding: Encoding,
3003        /// row selections and total selected row count
3004        row_selections: Option<(RowSelection, usize)>,
3005        /// row filter
3006        row_filter: Option<Vec<bool>>,
3007        /// limit
3008        limit: Option<usize>,
3009        /// offset
3010        offset: Option<usize>,
3011    }
3012
3013    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3014    impl std::fmt::Debug for TestOptions {
3015        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3016            f.debug_struct("TestOptions")
3017                .field("num_row_groups", &self.num_row_groups)
3018                .field("num_rows", &self.num_rows)
3019                .field("record_batch_size", &self.record_batch_size)
3020                .field("null_percent", &self.null_percent)
3021                .field("write_batch_size", &self.write_batch_size)
3022                .field("max_data_page_size", &self.max_data_page_size)
3023                .field("max_dict_page_size", &self.max_dict_page_size)
3024                .field("writer_version", &self.writer_version)
3025                .field("enabled_statistics", &self.enabled_statistics)
3026                .field("encoding", &self.encoding)
3027                .field("row_selections", &self.row_selections.is_some())
3028                .field("row_filter", &self.row_filter.is_some())
3029                .field("limit", &self.limit)
3030                .field("offset", &self.offset)
3031                .finish()
3032        }
3033    }
3034
3035    impl Default for TestOptions {
3036        fn default() -> Self {
3037            Self {
3038                num_row_groups: 2,
3039                num_rows: 100,
3040                record_batch_size: 15,
3041                null_percent: None,
3042                write_batch_size: 64,
3043                max_data_page_size: 1024 * 1024,
3044                max_dict_page_size: 1024 * 1024,
3045                writer_version: WriterVersion::PARQUET_1_0,
3046                enabled_statistics: EnabledStatistics::Page,
3047                encoding: Encoding::PLAIN,
3048                row_selections: None,
3049                row_filter: None,
3050                limit: None,
3051                offset: None,
3052            }
3053        }
3054    }
3055
3056    impl TestOptions {
3057        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3058            Self {
3059                num_row_groups,
3060                num_rows,
3061                record_batch_size,
3062                ..Default::default()
3063            }
3064        }
3065
3066        fn with_null_percent(self, null_percent: usize) -> Self {
3067            Self {
3068                null_percent: Some(null_percent),
3069                ..self
3070            }
3071        }
3072
3073        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3074            Self {
3075                max_data_page_size,
3076                ..self
3077            }
3078        }
3079
3080        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3081            Self {
3082                max_dict_page_size,
3083                ..self
3084            }
3085        }
3086
3087        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3088            Self {
3089                enabled_statistics,
3090                ..self
3091            }
3092        }
3093
3094        fn with_row_selections(self) -> Self {
3095            assert!(self.row_filter.is_none(), "Must set row selection first");
3096
3097            let mut rng = rng();
3098            let step = rng.random_range(self.record_batch_size..self.num_rows);
3099            let row_selections = create_test_selection(
3100                step,
3101                self.num_row_groups * self.num_rows,
3102                rng.random::<bool>(),
3103            );
3104            Self {
3105                row_selections: Some(row_selections),
3106                ..self
3107            }
3108        }
3109
3110        fn with_row_filter(self) -> Self {
3111            let row_count = match &self.row_selections {
3112                Some((_, count)) => *count,
3113                None => self.num_row_groups * self.num_rows,
3114            };
3115
3116            let mut rng = rng();
3117            Self {
3118                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3119                ..self
3120            }
3121        }
3122
3123        fn with_limit(self, limit: usize) -> Self {
3124            Self {
3125                limit: Some(limit),
3126                ..self
3127            }
3128        }
3129
3130        fn with_offset(self, offset: usize) -> Self {
3131            Self {
3132                offset: Some(offset),
3133                ..self
3134            }
3135        }
3136
3137        fn writer_props(&self) -> WriterProperties {
3138            let builder = WriterProperties::builder()
3139                .set_data_page_size_limit(self.max_data_page_size)
3140                .set_write_batch_size(self.write_batch_size)
3141                .set_writer_version(self.writer_version)
3142                .set_statistics_enabled(self.enabled_statistics);
3143
3144            let builder = match self.encoding {
3145                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3146                    .set_dictionary_enabled(true)
3147                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3148                _ => builder
3149                    .set_dictionary_enabled(false)
3150                    .set_encoding(self.encoding),
3151            };
3152
3153            builder.build()
3154        }
3155    }
3156
3157    /// Create a parquet file and then read it using
3158    /// `ParquetFileArrowReader` using a standard set of parameters
3159    /// `opts`.
3160    ///
3161    /// `rand_max` represents the maximum size of value to pass to to
3162    /// value generator
3163    fn run_single_column_reader_tests<T, F, G>(
3164        rand_max: i32,
3165        converted_type: ConvertedType,
3166        arrow_type: Option<ArrowDataType>,
3167        converter: F,
3168        encodings: &[Encoding],
3169    ) where
3170        T: DataType,
3171        G: RandGen<T>,
3172        F: Fn(&[Option<T::T>]) -> ArrayRef,
3173    {
3174        let all_options = vec![
3175            // choose record_batch_batch (15) so batches cross row
3176            // group boundaries (50 rows in 2 row groups) cases.
3177            TestOptions::new(2, 100, 15),
3178            // choose record_batch_batch (5) so batches sometime fall
3179            // on row group boundaries and (25 rows in 3 row groups
3180            // --> row groups of 10, 10, and 5). Tests buffer
3181            // refilling edge cases.
3182            TestOptions::new(3, 25, 5),
3183            // Choose record_batch_size (25) so all batches fall
3184            // exactly on row group boundary (25). Tests buffer
3185            // refilling edge cases.
3186            TestOptions::new(4, 100, 25),
3187            // Set maximum page size so row groups have multiple pages
3188            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3189            // Set small dictionary page size to test dictionary fallback
3190            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3191            // Test optional but with no nulls
3192            TestOptions::new(2, 256, 127).with_null_percent(0),
3193            // Test optional with nulls
3194            TestOptions::new(2, 256, 93).with_null_percent(25),
3195            // Test with limit of 0
3196            TestOptions::new(4, 100, 25).with_limit(0),
3197            // Test with limit of 50
3198            TestOptions::new(4, 100, 25).with_limit(50),
3199            // Test with limit equal to number of rows
3200            TestOptions::new(4, 100, 25).with_limit(10),
3201            // Test with limit larger than number of rows
3202            TestOptions::new(4, 100, 25).with_limit(101),
3203            // Test with limit + offset equal to number of rows
3204            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3205            // Test with limit + offset equal to number of rows
3206            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3207            // Test with limit + offset larger than number of rows
3208            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3209            // Test with no page-level statistics
3210            TestOptions::new(2, 256, 91)
3211                .with_null_percent(25)
3212                .with_enabled_statistics(EnabledStatistics::Chunk),
3213            // Test with no statistics
3214            TestOptions::new(2, 256, 91)
3215                .with_null_percent(25)
3216                .with_enabled_statistics(EnabledStatistics::None),
3217            // Test with all null
3218            TestOptions::new(2, 128, 91)
3219                .with_null_percent(100)
3220                .with_enabled_statistics(EnabledStatistics::None),
3221            // Test skip
3222
3223            // choose record_batch_batch (15) so batches cross row
3224            // group boundaries (50 rows in 2 row groups) cases.
3225            TestOptions::new(2, 100, 15).with_row_selections(),
3226            // choose record_batch_batch (5) so batches sometime fall
3227            // on row group boundaries and (25 rows in 3 row groups
3228            // --> row groups of 10, 10, and 5). Tests buffer
3229            // refilling edge cases.
3230            TestOptions::new(3, 25, 5).with_row_selections(),
3231            // Choose record_batch_size (25) so all batches fall
3232            // exactly on row group boundary (25). Tests buffer
3233            // refilling edge cases.
3234            TestOptions::new(4, 100, 25).with_row_selections(),
3235            // Set maximum page size so row groups have multiple pages
3236            TestOptions::new(3, 256, 73)
3237                .with_max_data_page_size(128)
3238                .with_row_selections(),
3239            // Set small dictionary page size to test dictionary fallback
3240            TestOptions::new(3, 256, 57)
3241                .with_max_dict_page_size(128)
3242                .with_row_selections(),
3243            // Test optional but with no nulls
3244            TestOptions::new(2, 256, 127)
3245                .with_null_percent(0)
3246                .with_row_selections(),
3247            // Test optional with nulls
3248            TestOptions::new(2, 256, 93)
3249                .with_null_percent(25)
3250                .with_row_selections(),
3251            // Test optional with nulls
3252            TestOptions::new(2, 256, 93)
3253                .with_null_percent(25)
3254                .with_row_selections()
3255                .with_limit(10),
3256            // Test optional with nulls
3257            TestOptions::new(2, 256, 93)
3258                .with_null_percent(25)
3259                .with_row_selections()
3260                .with_offset(20)
3261                .with_limit(10),
3262            // Test filter
3263
3264            // Test with row filter
3265            TestOptions::new(4, 100, 25).with_row_filter(),
3266            // Test with row selection and row filter
3267            TestOptions::new(4, 100, 25)
3268                .with_row_selections()
3269                .with_row_filter(),
3270            // Test with nulls and row filter
3271            TestOptions::new(2, 256, 93)
3272                .with_null_percent(25)
3273                .with_max_data_page_size(10)
3274                .with_row_filter(),
3275            // Test with nulls and row filter and small pages
3276            TestOptions::new(2, 256, 93)
3277                .with_null_percent(25)
3278                .with_max_data_page_size(10)
3279                .with_row_selections()
3280                .with_row_filter(),
3281            // Test with row selection and no offset index and small pages
3282            TestOptions::new(2, 256, 93)
3283                .with_enabled_statistics(EnabledStatistics::None)
3284                .with_max_data_page_size(10)
3285                .with_row_selections(),
3286        ];
3287
3288        all_options.into_iter().for_each(|opts| {
3289            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3290                for encoding in encodings {
3291                    let opts = TestOptions {
3292                        writer_version,
3293                        encoding: *encoding,
3294                        ..opts.clone()
3295                    };
3296
3297                    single_column_reader_test::<T, _, G>(
3298                        opts,
3299                        rand_max,
3300                        converted_type,
3301                        arrow_type.clone(),
3302                        &converter,
3303                    )
3304                }
3305            }
3306        });
3307    }
3308
3309    /// Create a parquet file and then read it using
3310    /// `ParquetFileArrowReader` using the parameters described in
3311    /// `opts`.
3312    fn single_column_reader_test<T, F, G>(
3313        opts: TestOptions,
3314        rand_max: i32,
3315        converted_type: ConvertedType,
3316        arrow_type: Option<ArrowDataType>,
3317        converter: F,
3318    ) where
3319        T: DataType,
3320        G: RandGen<T>,
3321        F: Fn(&[Option<T::T>]) -> ArrayRef,
3322    {
3323        // Print out options to facilitate debugging failures on CI
3324        println!(
3325            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3326            T::get_physical_type(),
3327            converted_type,
3328            arrow_type,
3329            opts
3330        );
3331
3332        //according to null_percent generate def_levels
3333        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3334            Some(null_percent) => {
3335                let mut rng = rng();
3336
3337                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3338                    .map(|_| {
3339                        std::iter::from_fn(|| {
3340                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3341                        })
3342                        .take(opts.num_rows)
3343                        .collect()
3344                    })
3345                    .collect();
3346                (Repetition::OPTIONAL, Some(def_levels))
3347            }
3348            None => (Repetition::REQUIRED, None),
3349        };
3350
3351        //generate random table data
3352        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3353            .map(|idx| {
3354                let null_count = match def_levels.as_ref() {
3355                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3356                    None => 0,
3357                };
3358                G::gen_vec(rand_max, opts.num_rows - null_count)
3359            })
3360            .collect();
3361
3362        let len = match T::get_physical_type() {
3363            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3364            crate::basic::Type::INT96 => 12,
3365            _ => -1,
3366        };
3367
3368        let fields = vec![Arc::new(
3369            Type::primitive_type_builder("leaf", T::get_physical_type())
3370                .with_repetition(repetition)
3371                .with_converted_type(converted_type)
3372                .with_length(len)
3373                .build()
3374                .unwrap(),
3375        )];
3376
3377        let schema = Arc::new(
3378            Type::group_type_builder("test_schema")
3379                .with_fields(fields)
3380                .build()
3381                .unwrap(),
3382        );
3383
3384        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3385
3386        let mut file = tempfile::tempfile().unwrap();
3387
3388        generate_single_column_file_with_data::<T>(
3389            &values,
3390            def_levels.as_ref(),
3391            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3392            schema,
3393            arrow_field,
3394            &opts,
3395        )
3396        .unwrap();
3397
3398        file.rewind().unwrap();
3399
3400        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3401            opts.enabled_statistics == EnabledStatistics::Page,
3402        ));
3403
3404        let mut builder =
3405            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3406
3407        let expected_data = match opts.row_selections {
3408            Some((selections, row_count)) => {
3409                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3410
3411                let mut skip_data: Vec<Option<T::T>> = vec![];
3412                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3413                for select in dequeue {
3414                    if select.skip {
3415                        without_skip_data.drain(0..select.row_count);
3416                    } else {
3417                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3418                    }
3419                }
3420                builder = builder.with_row_selection(selections);
3421
3422                assert_eq!(skip_data.len(), row_count);
3423                skip_data
3424            }
3425            None => {
3426                //get flatten table data
3427                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3428                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3429                expected_data
3430            }
3431        };
3432
3433        let mut expected_data = match opts.row_filter {
3434            Some(filter) => {
3435                let expected_data = expected_data
3436                    .into_iter()
3437                    .zip(filter.iter())
3438                    .filter_map(|(d, f)| f.then(|| d))
3439                    .collect();
3440
3441                let mut filter_offset = 0;
3442                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3443                    ProjectionMask::all(),
3444                    move |b| {
3445                        let array = BooleanArray::from_iter(
3446                            filter
3447                                .iter()
3448                                .skip(filter_offset)
3449                                .take(b.num_rows())
3450                                .map(|x| Some(*x)),
3451                        );
3452                        filter_offset += b.num_rows();
3453                        Ok(array)
3454                    },
3455                ))]);
3456
3457                builder = builder.with_row_filter(filter);
3458                expected_data
3459            }
3460            None => expected_data,
3461        };
3462
3463        if let Some(offset) = opts.offset {
3464            builder = builder.with_offset(offset);
3465            expected_data = expected_data.into_iter().skip(offset).collect();
3466        }
3467
3468        if let Some(limit) = opts.limit {
3469            builder = builder.with_limit(limit);
3470            expected_data = expected_data.into_iter().take(limit).collect();
3471        }
3472
3473        let mut record_reader = builder
3474            .with_batch_size(opts.record_batch_size)
3475            .build()
3476            .unwrap();
3477
3478        let mut total_read = 0;
3479        loop {
3480            let maybe_batch = record_reader.next();
3481            if total_read < expected_data.len() {
3482                let end = min(total_read + opts.record_batch_size, expected_data.len());
3483                let batch = maybe_batch.unwrap().unwrap();
3484                assert_eq!(end - total_read, batch.num_rows());
3485
3486                let a = converter(&expected_data[total_read..end]);
3487                let b = batch.column(0);
3488
3489                assert_eq!(a.data_type(), b.data_type());
3490                assert_eq!(a.to_data(), b.to_data());
3491                assert_eq!(
3492                    a.as_any().type_id(),
3493                    b.as_any().type_id(),
3494                    "incorrect type ids"
3495                );
3496
3497                total_read = end;
3498            } else {
3499                assert!(maybe_batch.is_none());
3500                break;
3501            }
3502        }
3503    }
3504
3505    fn gen_expected_data<T: DataType>(
3506        def_levels: Option<&Vec<Vec<i16>>>,
3507        values: &[Vec<T::T>],
3508    ) -> Vec<Option<T::T>> {
3509        let data: Vec<Option<T::T>> = match def_levels {
3510            Some(levels) => {
3511                let mut values_iter = values.iter().flatten();
3512                levels
3513                    .iter()
3514                    .flatten()
3515                    .map(|d| match d {
3516                        1 => Some(values_iter.next().cloned().unwrap()),
3517                        0 => None,
3518                        _ => unreachable!(),
3519                    })
3520                    .collect()
3521            }
3522            None => values.iter().flatten().cloned().map(Some).collect(),
3523        };
3524        data
3525    }
3526
3527    fn generate_single_column_file_with_data<T: DataType>(
3528        values: &[Vec<T::T>],
3529        def_levels: Option<&Vec<Vec<i16>>>,
3530        file: File,
3531        schema: TypePtr,
3532        field: Option<Field>,
3533        opts: &TestOptions,
3534    ) -> Result<ParquetMetaData> {
3535        let mut writer_props = opts.writer_props();
3536        if let Some(field) = field {
3537            let arrow_schema = Schema::new(vec![field]);
3538            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3539        }
3540
3541        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3542
3543        for (idx, v) in values.iter().enumerate() {
3544            let def_levels = def_levels.map(|d| d[idx].as_slice());
3545            let mut row_group_writer = writer.next_row_group()?;
3546            {
3547                let mut column_writer = row_group_writer
3548                    .next_column()?
3549                    .expect("Column writer is none!");
3550
3551                column_writer
3552                    .typed::<T>()
3553                    .write_batch(v, def_levels, None)?;
3554
3555                column_writer.close()?;
3556            }
3557            row_group_writer.close()?;
3558        }
3559
3560        writer.close()
3561    }
3562
3563    fn get_test_file(file_name: &str) -> File {
3564        let mut path = PathBuf::new();
3565        path.push(arrow::util::test_util::arrow_test_data());
3566        path.push(file_name);
3567
3568        File::open(path.as_path()).expect("File not found!")
3569    }
3570
3571    #[test]
3572    fn test_read_structs() {
3573        // This particular test file has columns of struct types where there is
3574        // a column that has the same name as one of the struct fields
3575        // (see: ARROW-11452)
3576        let testdata = arrow::util::test_util::parquet_test_data();
3577        let path = format!("{testdata}/nested_structs.rust.parquet");
3578        let file = File::open(&path).unwrap();
3579        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3580
3581        for batch in record_batch_reader {
3582            batch.unwrap();
3583        }
3584
3585        let file = File::open(&path).unwrap();
3586        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3587
3588        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3589        let projected_reader = builder
3590            .with_projection(mask)
3591            .with_batch_size(60)
3592            .build()
3593            .unwrap();
3594
3595        let expected_schema = Schema::new(vec![
3596            Field::new(
3597                "roll_num",
3598                ArrowDataType::Struct(Fields::from(vec![Field::new(
3599                    "count",
3600                    ArrowDataType::UInt64,
3601                    false,
3602                )])),
3603                false,
3604            ),
3605            Field::new(
3606                "PC_CUR",
3607                ArrowDataType::Struct(Fields::from(vec![
3608                    Field::new("mean", ArrowDataType::Int64, false),
3609                    Field::new("sum", ArrowDataType::Int64, false),
3610                ])),
3611                false,
3612            ),
3613        ]);
3614
3615        // Tests for #1652 and #1654
3616        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3617
3618        for batch in projected_reader {
3619            let batch = batch.unwrap();
3620            assert_eq!(batch.schema().as_ref(), &expected_schema);
3621        }
3622    }
3623
3624    #[test]
3625    // same as test_read_structs but constructs projection mask via column names
3626    fn test_read_structs_by_name() {
3627        let testdata = arrow::util::test_util::parquet_test_data();
3628        let path = format!("{testdata}/nested_structs.rust.parquet");
3629        let file = File::open(&path).unwrap();
3630        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3631
3632        for batch in record_batch_reader {
3633            batch.unwrap();
3634        }
3635
3636        let file = File::open(&path).unwrap();
3637        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3638
3639        let mask = ProjectionMask::columns(
3640            builder.parquet_schema(),
3641            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3642        );
3643        let projected_reader = builder
3644            .with_projection(mask)
3645            .with_batch_size(60)
3646            .build()
3647            .unwrap();
3648
3649        let expected_schema = Schema::new(vec![
3650            Field::new(
3651                "roll_num",
3652                ArrowDataType::Struct(Fields::from(vec![Field::new(
3653                    "count",
3654                    ArrowDataType::UInt64,
3655                    false,
3656                )])),
3657                false,
3658            ),
3659            Field::new(
3660                "PC_CUR",
3661                ArrowDataType::Struct(Fields::from(vec![
3662                    Field::new("mean", ArrowDataType::Int64, false),
3663                    Field::new("sum", ArrowDataType::Int64, false),
3664                ])),
3665                false,
3666            ),
3667        ]);
3668
3669        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3670
3671        for batch in projected_reader {
3672            let batch = batch.unwrap();
3673            assert_eq!(batch.schema().as_ref(), &expected_schema);
3674        }
3675    }
3676
3677    #[test]
3678    fn test_read_maps() {
3679        let testdata = arrow::util::test_util::parquet_test_data();
3680        let path = format!("{testdata}/nested_maps.snappy.parquet");
3681        let file = File::open(path).unwrap();
3682        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3683
3684        for batch in record_batch_reader {
3685            batch.unwrap();
3686        }
3687    }
3688
3689    #[test]
3690    fn test_nested_nullability() {
3691        let message_type = "message nested {
3692          OPTIONAL Group group {
3693            REQUIRED INT32 leaf;
3694          }
3695        }";
3696
3697        let file = tempfile::tempfile().unwrap();
3698        let schema = Arc::new(parse_message_type(message_type).unwrap());
3699
3700        {
3701            // Write using low-level parquet API (#1167)
3702            let mut writer =
3703                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3704                    .unwrap();
3705
3706            {
3707                let mut row_group_writer = writer.next_row_group().unwrap();
3708                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3709
3710                column_writer
3711                    .typed::<Int32Type>()
3712                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3713                    .unwrap();
3714
3715                column_writer.close().unwrap();
3716                row_group_writer.close().unwrap();
3717            }
3718
3719            writer.close().unwrap();
3720        }
3721
3722        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3723        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3724
3725        let reader = builder.with_projection(mask).build().unwrap();
3726
3727        let expected_schema = Schema::new(vec![Field::new(
3728            "group",
3729            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3730            true,
3731        )]);
3732
3733        let batch = reader.into_iter().next().unwrap().unwrap();
3734        assert_eq!(batch.schema().as_ref(), &expected_schema);
3735        assert_eq!(batch.num_rows(), 4);
3736        assert_eq!(batch.column(0).null_count(), 2);
3737    }
3738
3739    #[test]
3740    fn test_dictionary_preservation() {
3741        let fields = vec![Arc::new(
3742            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3743                .with_repetition(Repetition::OPTIONAL)
3744                .with_converted_type(ConvertedType::UTF8)
3745                .build()
3746                .unwrap(),
3747        )];
3748
3749        let schema = Arc::new(
3750            Type::group_type_builder("test_schema")
3751                .with_fields(fields)
3752                .build()
3753                .unwrap(),
3754        );
3755
3756        let dict_type = ArrowDataType::Dictionary(
3757            Box::new(ArrowDataType::Int32),
3758            Box::new(ArrowDataType::Utf8),
3759        );
3760
3761        let arrow_field = Field::new("leaf", dict_type, true);
3762
3763        let mut file = tempfile::tempfile().unwrap();
3764
3765        let values = vec![
3766            vec![
3767                ByteArray::from("hello"),
3768                ByteArray::from("a"),
3769                ByteArray::from("b"),
3770                ByteArray::from("d"),
3771            ],
3772            vec![
3773                ByteArray::from("c"),
3774                ByteArray::from("a"),
3775                ByteArray::from("b"),
3776            ],
3777        ];
3778
3779        let def_levels = vec![
3780            vec![1, 0, 0, 1, 0, 0, 1, 1],
3781            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3782        ];
3783
3784        let opts = TestOptions {
3785            encoding: Encoding::RLE_DICTIONARY,
3786            ..Default::default()
3787        };
3788
3789        generate_single_column_file_with_data::<ByteArrayType>(
3790            &values,
3791            Some(&def_levels),
3792            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3793            schema,
3794            Some(arrow_field),
3795            &opts,
3796        )
3797        .unwrap();
3798
3799        file.rewind().unwrap();
3800
3801        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3802
3803        let batches = record_reader
3804            .collect::<Result<Vec<RecordBatch>, _>>()
3805            .unwrap();
3806
3807        assert_eq!(batches.len(), 6);
3808        assert!(batches.iter().all(|x| x.num_columns() == 1));
3809
3810        let row_counts = batches
3811            .iter()
3812            .map(|x| (x.num_rows(), x.column(0).null_count()))
3813            .collect::<Vec<_>>();
3814
3815        assert_eq!(
3816            row_counts,
3817            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3818        );
3819
3820        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3821
3822        // First and second batch in same row group -> same dictionary
3823        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3824        // Third batch spans row group -> computed dictionary
3825        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3826        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3827        // Fourth, fifth and sixth from same row group -> same dictionary
3828        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3829        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3830    }
3831
3832    #[test]
3833    fn test_read_null_list() {
3834        let testdata = arrow::util::test_util::parquet_test_data();
3835        let path = format!("{testdata}/null_list.parquet");
3836        let file = File::open(path).unwrap();
3837        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3838
3839        let batch = record_batch_reader.next().unwrap().unwrap();
3840        assert_eq!(batch.num_rows(), 1);
3841        assert_eq!(batch.num_columns(), 1);
3842        assert_eq!(batch.column(0).len(), 1);
3843
3844        let list = batch
3845            .column(0)
3846            .as_any()
3847            .downcast_ref::<ListArray>()
3848            .unwrap();
3849        assert_eq!(list.len(), 1);
3850        assert!(list.is_valid(0));
3851
3852        let val = list.value(0);
3853        assert_eq!(val.len(), 0);
3854    }
3855
3856    #[test]
3857    fn test_null_schema_inference() {
3858        let testdata = arrow::util::test_util::parquet_test_data();
3859        let path = format!("{testdata}/null_list.parquet");
3860        let file = File::open(path).unwrap();
3861
3862        let arrow_field = Field::new(
3863            "emptylist",
3864            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3865            true,
3866        );
3867
3868        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3869        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3870        let schema = builder.schema();
3871        assert_eq!(schema.fields().len(), 1);
3872        assert_eq!(schema.field(0), &arrow_field);
3873    }
3874
3875    #[test]
3876    fn test_skip_metadata() {
3877        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3878        let field = Field::new("col", col.data_type().clone(), true);
3879
3880        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3881
3882        let metadata = [("key".to_string(), "value".to_string())]
3883            .into_iter()
3884            .collect();
3885
3886        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3887
3888        assert_ne!(schema_with_metadata, schema_without_metadata);
3889
3890        let batch =
3891            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3892
3893        let file = |version: WriterVersion| {
3894            let props = WriterProperties::builder()
3895                .set_writer_version(version)
3896                .build();
3897
3898            let file = tempfile().unwrap();
3899            let mut writer =
3900                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3901                    .unwrap();
3902            writer.write(&batch).unwrap();
3903            writer.close().unwrap();
3904            file
3905        };
3906
3907        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3908
3909        let v1_reader = file(WriterVersion::PARQUET_1_0);
3910        let v2_reader = file(WriterVersion::PARQUET_2_0);
3911
3912        let arrow_reader =
3913            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3914        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3915
3916        let reader =
3917            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3918                .unwrap()
3919                .build()
3920                .unwrap();
3921        assert_eq!(reader.schema(), schema_without_metadata);
3922
3923        let arrow_reader =
3924            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3925        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3926
3927        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3928            .unwrap()
3929            .build()
3930            .unwrap();
3931        assert_eq!(reader.schema(), schema_without_metadata);
3932    }
3933
3934    fn write_parquet_from_iter<I, F>(value: I) -> File
3935    where
3936        I: IntoIterator<Item = (F, ArrayRef)>,
3937        F: AsRef<str>,
3938    {
3939        let batch = RecordBatch::try_from_iter(value).unwrap();
3940        let file = tempfile().unwrap();
3941        let mut writer =
3942            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3943        writer.write(&batch).unwrap();
3944        writer.close().unwrap();
3945        file
3946    }
3947
3948    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3949    where
3950        I: IntoIterator<Item = (F, ArrayRef)>,
3951        F: AsRef<str>,
3952    {
3953        let file = write_parquet_from_iter(value);
3954        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3955        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3956            file.try_clone().unwrap(),
3957            options_with_schema,
3958        );
3959        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3960    }
3961
3962    #[test]
3963    fn test_schema_too_few_columns() {
3964        run_schema_test_with_error(
3965            vec![
3966                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3967                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3968            ],
3969            Arc::new(Schema::new(vec![Field::new(
3970                "int64",
3971                ArrowDataType::Int64,
3972                false,
3973            )])),
3974            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3975        );
3976    }
3977
3978    #[test]
3979    fn test_schema_too_many_columns() {
3980        run_schema_test_with_error(
3981            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3982            Arc::new(Schema::new(vec![
3983                Field::new("int64", ArrowDataType::Int64, false),
3984                Field::new("int32", ArrowDataType::Int32, false),
3985            ])),
3986            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3987        );
3988    }
3989
3990    #[test]
3991    fn test_schema_mismatched_column_names() {
3992        run_schema_test_with_error(
3993            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3994            Arc::new(Schema::new(vec![Field::new(
3995                "other",
3996                ArrowDataType::Int64,
3997                false,
3998            )])),
3999            "Arrow: incompatible arrow schema, expected field named int64 got other",
4000        );
4001    }
4002
4003    #[test]
4004    fn test_schema_incompatible_columns() {
4005        run_schema_test_with_error(
4006            vec![
4007                (
4008                    "col1_invalid",
4009                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4010                ),
4011                (
4012                    "col2_valid",
4013                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4014                ),
4015                (
4016                    "col3_invalid",
4017                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4018                ),
4019            ],
4020            Arc::new(Schema::new(vec![
4021                Field::new("col1_invalid", ArrowDataType::Int32, false),
4022                Field::new("col2_valid", ArrowDataType::Int32, false),
4023                Field::new("col3_invalid", ArrowDataType::Int32, false),
4024            ])),
4025            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4026        );
4027    }
4028
4029    #[test]
4030    fn test_one_incompatible_nested_column() {
4031        let nested_fields = Fields::from(vec![
4032            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4033            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4034        ]);
4035        let nested = StructArray::try_new(
4036            nested_fields,
4037            vec![
4038                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4039                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4040            ],
4041            None,
4042        )
4043        .expect("struct array");
4044        let supplied_nested_fields = Fields::from(vec![
4045            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4046            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4047        ]);
4048        run_schema_test_with_error(
4049            vec![
4050                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4051                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4052                ("nested", Arc::new(nested) as ArrayRef),
4053            ],
4054            Arc::new(Schema::new(vec![
4055                Field::new("col1", ArrowDataType::Int64, false),
4056                Field::new("col2", ArrowDataType::Int32, false),
4057                Field::new(
4058                    "nested",
4059                    ArrowDataType::Struct(supplied_nested_fields),
4060                    false,
4061                ),
4062            ])),
4063            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4064            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4065            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4066        );
4067    }
4068
4069    /// Return parquet data with a single column of utf8 strings
4070    fn utf8_parquet() -> Bytes {
4071        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4072        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4073        let props = None;
4074        // write parquet file with non nullable strings
4075        let mut parquet_data = vec![];
4076        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4077        writer.write(&batch).unwrap();
4078        writer.close().unwrap();
4079        Bytes::from(parquet_data)
4080    }
4081
4082    #[test]
4083    fn test_schema_error_bad_types() {
4084        // verify incompatible schemas error on read
4085        let parquet_data = utf8_parquet();
4086
4087        // Ask to read it back with an incompatible schema (int vs string)
4088        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4089            "column1",
4090            arrow::datatypes::DataType::Int32,
4091            false,
4092        )]));
4093
4094        // read it back out
4095        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4096        let err =
4097            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4098                .unwrap_err();
4099        assert_eq!(
4100            err.to_string(),
4101            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4102        )
4103    }
4104
4105    #[test]
4106    fn test_schema_error_bad_nullability() {
4107        // verify incompatible schemas error on read
4108        let parquet_data = utf8_parquet();
4109
4110        // Ask to read it back with an incompatible schema (nullability mismatch)
4111        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4112            "column1",
4113            arrow::datatypes::DataType::Utf8,
4114            true,
4115        )]));
4116
4117        // read it back out
4118        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4119        let err =
4120            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4121                .unwrap_err();
4122        assert_eq!(
4123            err.to_string(),
4124            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4125        )
4126    }
4127
4128    #[test]
4129    fn test_read_binary_as_utf8() {
4130        let file = write_parquet_from_iter(vec![
4131            (
4132                "binary_to_utf8",
4133                Arc::new(BinaryArray::from(vec![
4134                    b"one".as_ref(),
4135                    b"two".as_ref(),
4136                    b"three".as_ref(),
4137                ])) as ArrayRef,
4138            ),
4139            (
4140                "large_binary_to_large_utf8",
4141                Arc::new(LargeBinaryArray::from(vec![
4142                    b"one".as_ref(),
4143                    b"two".as_ref(),
4144                    b"three".as_ref(),
4145                ])) as ArrayRef,
4146            ),
4147            (
4148                "binary_view_to_utf8_view",
4149                Arc::new(BinaryViewArray::from(vec![
4150                    b"one".as_ref(),
4151                    b"two".as_ref(),
4152                    b"three".as_ref(),
4153                ])) as ArrayRef,
4154            ),
4155        ]);
4156        let supplied_fields = Fields::from(vec![
4157            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4158            Field::new(
4159                "large_binary_to_large_utf8",
4160                ArrowDataType::LargeUtf8,
4161                false,
4162            ),
4163            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4164        ]);
4165
4166        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4167        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4168            file.try_clone().unwrap(),
4169            options,
4170        )
4171        .expect("reader builder with schema")
4172        .build()
4173        .expect("reader with schema");
4174
4175        let batch = arrow_reader.next().unwrap().unwrap();
4176        assert_eq!(batch.num_columns(), 3);
4177        assert_eq!(batch.num_rows(), 3);
4178        assert_eq!(
4179            batch
4180                .column(0)
4181                .as_string::<i32>()
4182                .iter()
4183                .collect::<Vec<_>>(),
4184            vec![Some("one"), Some("two"), Some("three")]
4185        );
4186
4187        assert_eq!(
4188            batch
4189                .column(1)
4190                .as_string::<i64>()
4191                .iter()
4192                .collect::<Vec<_>>(),
4193            vec![Some("one"), Some("two"), Some("three")]
4194        );
4195
4196        assert_eq!(
4197            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4198            vec![Some("one"), Some("two"), Some("three")]
4199        );
4200    }
4201
4202    #[test]
4203    #[should_panic(expected = "Invalid UTF8 sequence at")]
4204    fn test_read_non_utf8_binary_as_utf8() {
4205        let file = write_parquet_from_iter(vec![(
4206            "non_utf8_binary",
4207            Arc::new(BinaryArray::from(vec![
4208                b"\xDE\x00\xFF".as_ref(),
4209                b"\xDE\x01\xAA".as_ref(),
4210                b"\xDE\x02\xFF".as_ref(),
4211            ])) as ArrayRef,
4212        )]);
4213        let supplied_fields = Fields::from(vec![Field::new(
4214            "non_utf8_binary",
4215            ArrowDataType::Utf8,
4216            false,
4217        )]);
4218
4219        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4220        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4221            file.try_clone().unwrap(),
4222            options,
4223        )
4224        .expect("reader builder with schema")
4225        .build()
4226        .expect("reader with schema");
4227        arrow_reader.next().unwrap().unwrap_err();
4228    }
4229
4230    #[test]
4231    fn test_with_schema() {
4232        let nested_fields = Fields::from(vec![
4233            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4234            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4235        ]);
4236
4237        let nested_arrays: Vec<ArrayRef> = vec![
4238            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4239            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4240        ];
4241
4242        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4243
4244        let file = write_parquet_from_iter(vec![
4245            (
4246                "int32_to_ts_second",
4247                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4248            ),
4249            (
4250                "date32_to_date64",
4251                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4252            ),
4253            ("nested", Arc::new(nested) as ArrayRef),
4254        ]);
4255
4256        let supplied_nested_fields = Fields::from(vec![
4257            Field::new(
4258                "utf8_to_dict",
4259                ArrowDataType::Dictionary(
4260                    Box::new(ArrowDataType::Int32),
4261                    Box::new(ArrowDataType::Utf8),
4262                ),
4263                false,
4264            ),
4265            Field::new(
4266                "int64_to_ts_nano",
4267                ArrowDataType::Timestamp(
4268                    arrow::datatypes::TimeUnit::Nanosecond,
4269                    Some("+10:00".into()),
4270                ),
4271                false,
4272            ),
4273        ]);
4274
4275        let supplied_schema = Arc::new(Schema::new(vec![
4276            Field::new(
4277                "int32_to_ts_second",
4278                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4279                false,
4280            ),
4281            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4282            Field::new(
4283                "nested",
4284                ArrowDataType::Struct(supplied_nested_fields),
4285                false,
4286            ),
4287        ]));
4288
4289        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4290        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4291            file.try_clone().unwrap(),
4292            options,
4293        )
4294        .expect("reader builder with schema")
4295        .build()
4296        .expect("reader with schema");
4297
4298        assert_eq!(arrow_reader.schema(), supplied_schema);
4299        let batch = arrow_reader.next().unwrap().unwrap();
4300        assert_eq!(batch.num_columns(), 3);
4301        assert_eq!(batch.num_rows(), 4);
4302        assert_eq!(
4303            batch
4304                .column(0)
4305                .as_any()
4306                .downcast_ref::<TimestampSecondArray>()
4307                .expect("downcast to timestamp second")
4308                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4309                .map(|v| v.to_string())
4310                .expect("value as datetime"),
4311            "1970-01-01 01:00:00 +01:00"
4312        );
4313        assert_eq!(
4314            batch
4315                .column(1)
4316                .as_any()
4317                .downcast_ref::<Date64Array>()
4318                .expect("downcast to date64")
4319                .value_as_date(0)
4320                .map(|v| v.to_string())
4321                .expect("value as date"),
4322            "1970-01-01"
4323        );
4324
4325        let nested = batch
4326            .column(2)
4327            .as_any()
4328            .downcast_ref::<StructArray>()
4329            .expect("downcast to struct");
4330
4331        let nested_dict = nested
4332            .column(0)
4333            .as_any()
4334            .downcast_ref::<Int32DictionaryArray>()
4335            .expect("downcast to dictionary");
4336
4337        assert_eq!(
4338            nested_dict
4339                .values()
4340                .as_any()
4341                .downcast_ref::<StringArray>()
4342                .expect("downcast to string")
4343                .iter()
4344                .collect::<Vec<_>>(),
4345            vec![Some("a"), Some("b")]
4346        );
4347
4348        assert_eq!(
4349            nested_dict.keys().iter().collect::<Vec<_>>(),
4350            vec![Some(0), Some(0), Some(0), Some(1)]
4351        );
4352
4353        assert_eq!(
4354            nested
4355                .column(1)
4356                .as_any()
4357                .downcast_ref::<TimestampNanosecondArray>()
4358                .expect("downcast to timestamp nanosecond")
4359                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4360                .map(|v| v.to_string())
4361                .expect("value as datetime"),
4362            "1970-01-01 10:00:00.000000001 +10:00"
4363        );
4364    }
4365
4366    #[test]
4367    fn test_empty_projection() {
4368        let testdata = arrow::util::test_util::parquet_test_data();
4369        let path = format!("{testdata}/alltypes_plain.parquet");
4370        let file = File::open(path).unwrap();
4371
4372        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4373        let file_metadata = builder.metadata().file_metadata();
4374        let expected_rows = file_metadata.num_rows() as usize;
4375
4376        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4377        let batch_reader = builder
4378            .with_projection(mask)
4379            .with_batch_size(2)
4380            .build()
4381            .unwrap();
4382
4383        let mut total_rows = 0;
4384        for maybe_batch in batch_reader {
4385            let batch = maybe_batch.unwrap();
4386            total_rows += batch.num_rows();
4387            assert_eq!(batch.num_columns(), 0);
4388            assert!(batch.num_rows() <= 2);
4389        }
4390
4391        assert_eq!(total_rows, expected_rows);
4392    }
4393
4394    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4395        let schema = Arc::new(Schema::new(vec![Field::new(
4396            "list",
4397            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4398            true,
4399        )]));
4400
4401        let mut buf = Vec::with_capacity(1024);
4402
4403        let mut writer = ArrowWriter::try_new(
4404            &mut buf,
4405            schema.clone(),
4406            Some(
4407                WriterProperties::builder()
4408                    .set_max_row_group_row_count(Some(row_group_size))
4409                    .build(),
4410            ),
4411        )
4412        .unwrap();
4413        for _ in 0..2 {
4414            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4415            for _ in 0..(batch_size) {
4416                list_builder.append(true);
4417            }
4418            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4419                .unwrap();
4420            writer.write(&batch).unwrap();
4421        }
4422        writer.close().unwrap();
4423
4424        let mut record_reader =
4425            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4426        assert_eq!(
4427            batch_size,
4428            record_reader.next().unwrap().unwrap().num_rows()
4429        );
4430        assert_eq!(
4431            batch_size,
4432            record_reader.next().unwrap().unwrap().num_rows()
4433        );
4434    }
4435
4436    #[test]
4437    fn test_row_group_exact_multiple() {
4438        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4439        test_row_group_batch(8, 8);
4440        test_row_group_batch(10, 8);
4441        test_row_group_batch(8, 10);
4442        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4443        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4444        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4445        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4446        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4447    }
4448
4449    /// Given a RecordBatch containing all the column data, return the expected batches given
4450    /// a `batch_size` and `selection`
4451    fn get_expected_batches(
4452        column: &RecordBatch,
4453        selection: &RowSelection,
4454        batch_size: usize,
4455    ) -> Vec<RecordBatch> {
4456        let mut expected_batches = vec![];
4457
4458        let mut selection: VecDeque<_> = selection.clone().into();
4459        let mut row_offset = 0;
4460        let mut last_start = None;
4461        while row_offset < column.num_rows() && !selection.is_empty() {
4462            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4463            while batch_remaining > 0 && !selection.is_empty() {
4464                let (to_read, skip) = match selection.front_mut() {
4465                    Some(selection) if selection.row_count > batch_remaining => {
4466                        selection.row_count -= batch_remaining;
4467                        (batch_remaining, selection.skip)
4468                    }
4469                    Some(_) => {
4470                        let select = selection.pop_front().unwrap();
4471                        (select.row_count, select.skip)
4472                    }
4473                    None => break,
4474                };
4475
4476                batch_remaining -= to_read;
4477
4478                match skip {
4479                    true => {
4480                        if let Some(last_start) = last_start.take() {
4481                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4482                        }
4483                        row_offset += to_read
4484                    }
4485                    false => {
4486                        last_start.get_or_insert(row_offset);
4487                        row_offset += to_read
4488                    }
4489                }
4490            }
4491        }
4492
4493        if let Some(last_start) = last_start.take() {
4494            expected_batches.push(column.slice(last_start, row_offset - last_start))
4495        }
4496
4497        // Sanity check, all batches except the final should be the batch size
4498        for batch in &expected_batches[..expected_batches.len() - 1] {
4499            assert_eq!(batch.num_rows(), batch_size);
4500        }
4501
4502        expected_batches
4503    }
4504
4505    fn create_test_selection(
4506        step_len: usize,
4507        total_len: usize,
4508        skip_first: bool,
4509    ) -> (RowSelection, usize) {
4510        let mut remaining = total_len;
4511        let mut skip = skip_first;
4512        let mut vec = vec![];
4513        let mut selected_count = 0;
4514        while remaining != 0 {
4515            let step = if remaining > step_len {
4516                step_len
4517            } else {
4518                remaining
4519            };
4520            vec.push(RowSelector {
4521                row_count: step,
4522                skip,
4523            });
4524            remaining -= step;
4525            if !skip {
4526                selected_count += step;
4527            }
4528            skip = !skip;
4529        }
4530        (vec.into(), selected_count)
4531    }
4532
4533    #[test]
4534    fn test_scan_row_with_selection() {
4535        let testdata = arrow::util::test_util::parquet_test_data();
4536        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4537        let test_file = File::open(&path).unwrap();
4538
4539        let mut serial_reader =
4540            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4541        let data = serial_reader.next().unwrap().unwrap();
4542
4543        let do_test = |batch_size: usize, selection_len: usize| {
4544            for skip_first in [false, true] {
4545                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4546
4547                let expected = get_expected_batches(&data, &selections, batch_size);
4548                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4549                assert_eq!(
4550                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4551                    expected,
4552                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4553                );
4554            }
4555        };
4556
4557        // total row count 7300
4558        // 1. test selection len more than one page row count
4559        do_test(1000, 1000);
4560
4561        // 2. test selection len less than one page row count
4562        do_test(20, 20);
4563
4564        // 3. test selection_len less than batch_size
4565        do_test(20, 5);
4566
4567        // 4. test selection_len more than batch_size
4568        // If batch_size < selection_len
4569        do_test(20, 5);
4570
4571        fn create_skip_reader(
4572            test_file: &File,
4573            batch_size: usize,
4574            selections: RowSelection,
4575        ) -> ParquetRecordBatchReader {
4576            let options =
4577                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4578            let file = test_file.try_clone().unwrap();
4579            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4580                .unwrap()
4581                .with_batch_size(batch_size)
4582                .with_row_selection(selections)
4583                .build()
4584                .unwrap()
4585        }
4586    }
4587
4588    #[test]
4589    fn test_batch_size_overallocate() {
4590        let testdata = arrow::util::test_util::parquet_test_data();
4591        // `alltypes_plain.parquet` only have 8 rows
4592        let path = format!("{testdata}/alltypes_plain.parquet");
4593        let test_file = File::open(path).unwrap();
4594
4595        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4596        let num_rows = builder.metadata.file_metadata().num_rows();
4597        let reader = builder
4598            .with_batch_size(1024)
4599            .with_projection(ProjectionMask::all())
4600            .build()
4601            .unwrap();
4602        assert_ne!(1024, num_rows);
4603        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4604    }
4605
4606    #[test]
4607    fn test_read_with_page_index_enabled() {
4608        let testdata = arrow::util::test_util::parquet_test_data();
4609
4610        {
4611            // `alltypes_tiny_pages.parquet` has page index
4612            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4613            let test_file = File::open(path).unwrap();
4614            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4615                test_file,
4616                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4617            )
4618            .unwrap();
4619            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4620            let reader = builder.build().unwrap();
4621            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4622            assert_eq!(batches.len(), 8);
4623        }
4624
4625        {
4626            // `alltypes_plain.parquet` doesn't have page index
4627            let path = format!("{testdata}/alltypes_plain.parquet");
4628            let test_file = File::open(path).unwrap();
4629            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4630                test_file,
4631                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4632            )
4633            .unwrap();
4634            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4635            // we should read the file successfully.
4636            assert!(builder.metadata().offset_index().is_none());
4637            let reader = builder.build().unwrap();
4638            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4639            assert_eq!(batches.len(), 1);
4640        }
4641    }
4642
4643    #[test]
4644    fn test_raw_repetition() {
4645        const MESSAGE_TYPE: &str = "
4646            message Log {
4647              OPTIONAL INT32 eventType;
4648              REPEATED INT32 category;
4649              REPEATED group filter {
4650                OPTIONAL INT32 error;
4651              }
4652            }
4653        ";
4654        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4655        let props = Default::default();
4656
4657        let mut buf = Vec::with_capacity(1024);
4658        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4659        let mut row_group_writer = writer.next_row_group().unwrap();
4660
4661        // column 0
4662        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4663        col_writer
4664            .typed::<Int32Type>()
4665            .write_batch(&[1], Some(&[1]), None)
4666            .unwrap();
4667        col_writer.close().unwrap();
4668        // column 1
4669        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4670        col_writer
4671            .typed::<Int32Type>()
4672            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4673            .unwrap();
4674        col_writer.close().unwrap();
4675        // column 2
4676        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4677        col_writer
4678            .typed::<Int32Type>()
4679            .write_batch(&[1], Some(&[1]), Some(&[0]))
4680            .unwrap();
4681        col_writer.close().unwrap();
4682
4683        let rg_md = row_group_writer.close().unwrap();
4684        assert_eq!(rg_md.num_rows(), 1);
4685        writer.close().unwrap();
4686
4687        let bytes = Bytes::from(buf);
4688
4689        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4690        let full = no_mask.next().unwrap().unwrap();
4691
4692        assert_eq!(full.num_columns(), 3);
4693
4694        for idx in 0..3 {
4695            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4696            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4697            let mut reader = b.with_projection(mask).build().unwrap();
4698            let projected = reader.next().unwrap().unwrap();
4699
4700            assert_eq!(projected.num_columns(), 1);
4701            assert_eq!(full.column(idx), projected.column(0));
4702        }
4703    }
4704
4705    #[test]
4706    fn test_read_lz4_raw() {
4707        let testdata = arrow::util::test_util::parquet_test_data();
4708        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4709        let file = File::open(path).unwrap();
4710
4711        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4712            .unwrap()
4713            .collect::<Result<Vec<_>, _>>()
4714            .unwrap();
4715        assert_eq!(batches.len(), 1);
4716        let batch = &batches[0];
4717
4718        assert_eq!(batch.num_columns(), 3);
4719        assert_eq!(batch.num_rows(), 4);
4720
4721        // https://github.com/apache/parquet-testing/pull/18
4722        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4723        assert_eq!(
4724            a.values(),
4725            &[1593604800, 1593604800, 1593604801, 1593604801]
4726        );
4727
4728        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4729        let a: Vec<_> = a.iter().flatten().collect();
4730        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4731
4732        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4733        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4734    }
4735
4736    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4737    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4738    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4739    //    LZ4_HADOOP algorithm for compression.
4740    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4741    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4742    //    older parquet-cpp versions.
4743    //
4744    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4745    #[test]
4746    fn test_read_lz4_hadoop_fallback() {
4747        for file in [
4748            "hadoop_lz4_compressed.parquet",
4749            "non_hadoop_lz4_compressed.parquet",
4750        ] {
4751            let testdata = arrow::util::test_util::parquet_test_data();
4752            let path = format!("{testdata}/{file}");
4753            let file = File::open(path).unwrap();
4754            let expected_rows = 4;
4755
4756            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4757                .unwrap()
4758                .collect::<Result<Vec<_>, _>>()
4759                .unwrap();
4760            assert_eq!(batches.len(), 1);
4761            let batch = &batches[0];
4762
4763            assert_eq!(batch.num_columns(), 3);
4764            assert_eq!(batch.num_rows(), expected_rows);
4765
4766            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4767            assert_eq!(
4768                a.values(),
4769                &[1593604800, 1593604800, 1593604801, 1593604801]
4770            );
4771
4772            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4773            let b: Vec<_> = b.iter().flatten().collect();
4774            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4775
4776            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4777            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4778        }
4779    }
4780
4781    #[test]
4782    fn test_read_lz4_hadoop_large() {
4783        let testdata = arrow::util::test_util::parquet_test_data();
4784        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4785        let file = File::open(path).unwrap();
4786        let expected_rows = 10000;
4787
4788        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4789            .unwrap()
4790            .collect::<Result<Vec<_>, _>>()
4791            .unwrap();
4792        assert_eq!(batches.len(), 1);
4793        let batch = &batches[0];
4794
4795        assert_eq!(batch.num_columns(), 1);
4796        assert_eq!(batch.num_rows(), expected_rows);
4797
4798        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4799        let a: Vec<_> = a.iter().flatten().collect();
4800        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4801        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4802        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4803        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4804    }
4805
4806    #[test]
4807    #[cfg(feature = "snap")]
4808    fn test_read_nested_lists() {
4809        let testdata = arrow::util::test_util::parquet_test_data();
4810        let path = format!("{testdata}/nested_lists.snappy.parquet");
4811        let file = File::open(path).unwrap();
4812
4813        let f = file.try_clone().unwrap();
4814        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4815        let expected = reader.next().unwrap().unwrap();
4816        assert_eq!(expected.num_rows(), 3);
4817
4818        let selection = RowSelection::from(vec![
4819            RowSelector::skip(1),
4820            RowSelector::select(1),
4821            RowSelector::skip(1),
4822        ]);
4823        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4824            .unwrap()
4825            .with_row_selection(selection)
4826            .build()
4827            .unwrap();
4828
4829        let actual = reader.next().unwrap().unwrap();
4830        assert_eq!(actual.num_rows(), 1);
4831        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4832    }
4833
4834    #[test]
4835    fn test_arbitrary_decimal() {
4836        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4837        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4838            .with_precision_and_scale(19, 0)
4839            .unwrap();
4840        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4841            .with_precision_and_scale(12, 0)
4842            .unwrap();
4843        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4844            .with_precision_and_scale(17, 10)
4845            .unwrap();
4846
4847        let written = RecordBatch::try_from_iter([
4848            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4849            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4850            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4851        ])
4852        .unwrap();
4853
4854        let mut buffer = Vec::with_capacity(1024);
4855        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4856        writer.write(&written).unwrap();
4857        writer.close().unwrap();
4858
4859        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4860            .unwrap()
4861            .collect::<Result<Vec<_>, _>>()
4862            .unwrap();
4863
4864        assert_eq!(&written.slice(0, 8), &read[0]);
4865    }
4866
4867    #[test]
4868    fn test_list_skip() {
4869        let mut list = ListBuilder::new(Int32Builder::new());
4870        list.append_value([Some(1), Some(2)]);
4871        list.append_value([Some(3)]);
4872        list.append_value([Some(4)]);
4873        let list = list.finish();
4874        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4875
4876        // First page contains 2 values but only 1 row
4877        let props = WriterProperties::builder()
4878            .set_data_page_row_count_limit(1)
4879            .set_write_batch_size(2)
4880            .build();
4881
4882        let mut buffer = Vec::with_capacity(1024);
4883        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4884        writer.write(&batch).unwrap();
4885        writer.close().unwrap();
4886
4887        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4888        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4889            .unwrap()
4890            .with_row_selection(selection.into())
4891            .build()
4892            .unwrap();
4893        let out = reader.next().unwrap().unwrap();
4894        assert_eq!(out.num_rows(), 1);
4895        assert_eq!(out, batch.slice(2, 1));
4896    }
4897
4898    fn test_decimal32_roundtrip() {
4899        let d = |values: Vec<i32>, p: u8| {
4900            let iter = values.into_iter();
4901            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4902                .with_precision_and_scale(p, 2)
4903                .unwrap()
4904        };
4905
4906        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4907        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4908
4909        let mut buffer = Vec::with_capacity(1024);
4910        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4911        writer.write(&batch).unwrap();
4912        writer.close().unwrap();
4913
4914        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4915        let t1 = builder.parquet_schema().columns()[0].physical_type();
4916        assert_eq!(t1, PhysicalType::INT32);
4917
4918        let mut reader = builder.build().unwrap();
4919        assert_eq!(batch.schema(), reader.schema());
4920
4921        let out = reader.next().unwrap().unwrap();
4922        assert_eq!(batch, out);
4923    }
4924
4925    fn test_decimal64_roundtrip() {
4926        // Precision <= 9 -> INT32
4927        // Precision <= 18 -> INT64
4928
4929        let d = |values: Vec<i64>, p: u8| {
4930            let iter = values.into_iter();
4931            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4932                .with_precision_and_scale(p, 2)
4933                .unwrap()
4934        };
4935
4936        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4937        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4938        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4939
4940        let batch = RecordBatch::try_from_iter([
4941            ("d1", Arc::new(d1) as ArrayRef),
4942            ("d2", Arc::new(d2) as ArrayRef),
4943            ("d3", Arc::new(d3) as ArrayRef),
4944        ])
4945        .unwrap();
4946
4947        let mut buffer = Vec::with_capacity(1024);
4948        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4949        writer.write(&batch).unwrap();
4950        writer.close().unwrap();
4951
4952        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4953        let t1 = builder.parquet_schema().columns()[0].physical_type();
4954        assert_eq!(t1, PhysicalType::INT32);
4955        let t2 = builder.parquet_schema().columns()[1].physical_type();
4956        assert_eq!(t2, PhysicalType::INT64);
4957        let t3 = builder.parquet_schema().columns()[2].physical_type();
4958        assert_eq!(t3, PhysicalType::INT64);
4959
4960        let mut reader = builder.build().unwrap();
4961        assert_eq!(batch.schema(), reader.schema());
4962
4963        let out = reader.next().unwrap().unwrap();
4964        assert_eq!(batch, out);
4965    }
4966
4967    fn test_decimal_roundtrip<T: DecimalType>() {
4968        // Precision <= 9 -> INT32
4969        // Precision <= 18 -> INT64
4970        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4971
4972        let d = |values: Vec<usize>, p: u8| {
4973            let iter = values.into_iter().map(T::Native::usize_as);
4974            PrimitiveArray::<T>::from_iter_values(iter)
4975                .with_precision_and_scale(p, 2)
4976                .unwrap()
4977        };
4978
4979        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4980        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4981        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4982        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4983
4984        let batch = RecordBatch::try_from_iter([
4985            ("d1", Arc::new(d1) as ArrayRef),
4986            ("d2", Arc::new(d2) as ArrayRef),
4987            ("d3", Arc::new(d3) as ArrayRef),
4988            ("d4", Arc::new(d4) as ArrayRef),
4989        ])
4990        .unwrap();
4991
4992        let mut buffer = Vec::with_capacity(1024);
4993        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4994        writer.write(&batch).unwrap();
4995        writer.close().unwrap();
4996
4997        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4998        let t1 = builder.parquet_schema().columns()[0].physical_type();
4999        assert_eq!(t1, PhysicalType::INT32);
5000        let t2 = builder.parquet_schema().columns()[1].physical_type();
5001        assert_eq!(t2, PhysicalType::INT64);
5002        let t3 = builder.parquet_schema().columns()[2].physical_type();
5003        assert_eq!(t3, PhysicalType::INT64);
5004        let t4 = builder.parquet_schema().columns()[3].physical_type();
5005        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5006
5007        let mut reader = builder.build().unwrap();
5008        assert_eq!(batch.schema(), reader.schema());
5009
5010        let out = reader.next().unwrap().unwrap();
5011        assert_eq!(batch, out);
5012    }
5013
5014    #[test]
5015    fn test_decimal() {
5016        test_decimal32_roundtrip();
5017        test_decimal64_roundtrip();
5018        test_decimal_roundtrip::<Decimal128Type>();
5019        test_decimal_roundtrip::<Decimal256Type>();
5020    }
5021
5022    #[test]
5023    fn test_list_selection() {
5024        let schema = Arc::new(Schema::new(vec![Field::new_list(
5025            "list",
5026            Field::new_list_field(ArrowDataType::Utf8, true),
5027            false,
5028        )]));
5029        let mut buf = Vec::with_capacity(1024);
5030
5031        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5032
5033        for i in 0..2 {
5034            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5035            for j in 0..1024 {
5036                list_a_builder.values().append_value(format!("{i} {j}"));
5037                list_a_builder.append(true);
5038            }
5039            let batch =
5040                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5041                    .unwrap();
5042            writer.write(&batch).unwrap();
5043        }
5044        let _metadata = writer.close().unwrap();
5045
5046        let buf = Bytes::from(buf);
5047        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5048            .unwrap()
5049            .with_row_selection(RowSelection::from(vec![
5050                RowSelector::skip(100),
5051                RowSelector::select(924),
5052                RowSelector::skip(100),
5053                RowSelector::select(924),
5054            ]))
5055            .build()
5056            .unwrap();
5057
5058        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5059        let batch = concat_batches(&schema, &batches).unwrap();
5060
5061        assert_eq!(batch.num_rows(), 924 * 2);
5062        let list = batch.column(0).as_list::<i32>();
5063
5064        for w in list.value_offsets().windows(2) {
5065            assert_eq!(w[0] + 1, w[1])
5066        }
5067        let mut values = list.values().as_string::<i32>().iter();
5068
5069        for i in 0..2 {
5070            for j in 100..1024 {
5071                let expected = format!("{i} {j}");
5072                assert_eq!(values.next().unwrap().unwrap(), &expected);
5073            }
5074        }
5075    }
5076
5077    #[test]
5078    fn test_list_selection_fuzz() {
5079        let mut rng = rng();
5080        let schema = Arc::new(Schema::new(vec![Field::new_list(
5081            "list",
5082            Field::new_list(
5083                Field::LIST_FIELD_DEFAULT_NAME,
5084                Field::new_list_field(ArrowDataType::Int32, true),
5085                true,
5086            ),
5087            true,
5088        )]));
5089        let mut buf = Vec::with_capacity(1024);
5090        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5091
5092        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5093
5094        for _ in 0..2048 {
5095            if rng.random_bool(0.2) {
5096                list_a_builder.append(false);
5097                continue;
5098            }
5099
5100            let list_a_len = rng.random_range(0..10);
5101            let list_b_builder = list_a_builder.values();
5102
5103            for _ in 0..list_a_len {
5104                if rng.random_bool(0.2) {
5105                    list_b_builder.append(false);
5106                    continue;
5107                }
5108
5109                let list_b_len = rng.random_range(0..10);
5110                let int_builder = list_b_builder.values();
5111                for _ in 0..list_b_len {
5112                    match rng.random_bool(0.2) {
5113                        true => int_builder.append_null(),
5114                        false => int_builder.append_value(rng.random()),
5115                    }
5116                }
5117                list_b_builder.append(true)
5118            }
5119            list_a_builder.append(true);
5120        }
5121
5122        let array = Arc::new(list_a_builder.finish());
5123        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5124
5125        writer.write(&batch).unwrap();
5126        let _metadata = writer.close().unwrap();
5127
5128        let buf = Bytes::from(buf);
5129
5130        let cases = [
5131            vec![
5132                RowSelector::skip(100),
5133                RowSelector::select(924),
5134                RowSelector::skip(100),
5135                RowSelector::select(924),
5136            ],
5137            vec![
5138                RowSelector::select(924),
5139                RowSelector::skip(100),
5140                RowSelector::select(924),
5141                RowSelector::skip(100),
5142            ],
5143            vec![
5144                RowSelector::skip(1023),
5145                RowSelector::select(1),
5146                RowSelector::skip(1023),
5147                RowSelector::select(1),
5148            ],
5149            vec![
5150                RowSelector::select(1),
5151                RowSelector::skip(1023),
5152                RowSelector::select(1),
5153                RowSelector::skip(1023),
5154            ],
5155        ];
5156
5157        for batch_size in [100, 1024, 2048] {
5158            for selection in &cases {
5159                let selection = RowSelection::from(selection.clone());
5160                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5161                    .unwrap()
5162                    .with_row_selection(selection.clone())
5163                    .with_batch_size(batch_size)
5164                    .build()
5165                    .unwrap();
5166
5167                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5168                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5169                assert_eq!(actual.num_rows(), selection.row_count());
5170
5171                let mut batch_offset = 0;
5172                let mut actual_offset = 0;
5173                for selector in selection.iter() {
5174                    if selector.skip {
5175                        batch_offset += selector.row_count;
5176                        continue;
5177                    }
5178
5179                    assert_eq!(
5180                        batch.slice(batch_offset, selector.row_count),
5181                        actual.slice(actual_offset, selector.row_count)
5182                    );
5183
5184                    batch_offset += selector.row_count;
5185                    actual_offset += selector.row_count;
5186                }
5187            }
5188        }
5189    }
5190
5191    #[test]
5192    fn test_read_old_nested_list() {
5193        use arrow::datatypes::DataType;
5194        use arrow::datatypes::ToByteSlice;
5195
5196        let testdata = arrow::util::test_util::parquet_test_data();
5197        // message my_record {
5198        //     REQUIRED group a (LIST) {
5199        //         REPEATED group array (LIST) {
5200        //             REPEATED INT32 array;
5201        //         }
5202        //     }
5203        // }
5204        // should be read as list<list<int32>>
5205        let path = format!("{testdata}/old_list_structure.parquet");
5206        let test_file = File::open(path).unwrap();
5207
5208        // create expected ListArray
5209        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5210
5211        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5212        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5213
5214        // Construct a list array from the above two
5215        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5216            "array",
5217            DataType::Int32,
5218            false,
5219        ))))
5220        .len(2)
5221        .add_buffer(a_value_offsets)
5222        .add_child_data(a_values.into_data())
5223        .build()
5224        .unwrap();
5225        let a = ListArray::from(a_list_data);
5226
5227        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5228        let mut reader = builder.build().unwrap();
5229        let out = reader.next().unwrap().unwrap();
5230        assert_eq!(out.num_rows(), 1);
5231        assert_eq!(out.num_columns(), 1);
5232        // grab first column
5233        let c0 = out.column(0);
5234        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5235        // get first row: [[1, 2], [3, 4]]
5236        let r0 = c0arr.value(0);
5237        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5238        assert_eq!(r0arr, &a);
5239    }
5240
5241    #[test]
5242    fn test_map_no_value() {
5243        // File schema:
5244        // message schema {
5245        //   required group my_map (MAP) {
5246        //     repeated group key_value {
5247        //       required int32 key;
5248        //       optional int32 value;
5249        //     }
5250        //   }
5251        //   required group my_map_no_v (MAP) {
5252        //     repeated group key_value {
5253        //       required int32 key;
5254        //     }
5255        //   }
5256        //   required group my_list (LIST) {
5257        //     repeated group list {
5258        //       required int32 element;
5259        //     }
5260        //   }
5261        // }
5262        let testdata = arrow::util::test_util::parquet_test_data();
5263        let path = format!("{testdata}/map_no_value.parquet");
5264        let file = File::open(path).unwrap();
5265
5266        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5267            .unwrap()
5268            .build()
5269            .unwrap();
5270        let out = reader.next().unwrap().unwrap();
5271        assert_eq!(out.num_rows(), 3);
5272        assert_eq!(out.num_columns(), 3);
5273        // my_map_no_v and my_list columns should now be equivalent
5274        let c0 = out.column(1).as_list::<i32>();
5275        let c1 = out.column(2).as_list::<i32>();
5276        assert_eq!(c0.len(), c1.len());
5277        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5278    }
5279
5280    #[test]
5281    fn test_read_unknown_logical_type() {
5282        let testdata = arrow::util::test_util::parquet_test_data();
5283        let path = format!("{testdata}/unknown-logical-type.parquet");
5284        let test_file = File::open(path).unwrap();
5285
5286        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5287            .expect("Error creating reader builder");
5288
5289        let schema = builder.metadata().file_metadata().schema_descr();
5290        assert_eq!(
5291            schema.column(0).logical_type_ref(),
5292            Some(&LogicalType::String)
5293        );
5294        assert_eq!(
5295            schema.column(1).logical_type_ref(),
5296            Some(&LogicalType::_Unknown { field_id: 2555 })
5297        );
5298        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5299
5300        let mut reader = builder.build().unwrap();
5301        let out = reader.next().unwrap().unwrap();
5302        assert_eq!(out.num_rows(), 3);
5303        assert_eq!(out.num_columns(), 2);
5304    }
5305
5306    #[test]
5307    fn test_read_row_numbers() {
5308        let file = write_parquet_from_iter(vec![(
5309            "value",
5310            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5311        )]);
5312        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5313
5314        let row_number_field = Arc::new(
5315            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5316        );
5317
5318        let options = ArrowReaderOptions::new()
5319            .with_schema(Arc::new(Schema::new(supplied_fields)))
5320            .with_virtual_columns(vec![row_number_field.clone()])
5321            .unwrap();
5322        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5323            file.try_clone().unwrap(),
5324            options,
5325        )
5326        .expect("reader builder with schema")
5327        .build()
5328        .expect("reader with schema");
5329
5330        let batch = arrow_reader.next().unwrap().unwrap();
5331        let schema = Arc::new(Schema::new(vec![
5332            Field::new("value", ArrowDataType::Int64, false),
5333            (*row_number_field).clone(),
5334        ]));
5335
5336        assert_eq!(batch.schema(), schema);
5337        assert_eq!(batch.num_columns(), 2);
5338        assert_eq!(batch.num_rows(), 3);
5339        assert_eq!(
5340            batch
5341                .column(0)
5342                .as_primitive::<types::Int64Type>()
5343                .iter()
5344                .collect::<Vec<_>>(),
5345            vec![Some(1), Some(2), Some(3)]
5346        );
5347        assert_eq!(
5348            batch
5349                .column(1)
5350                .as_primitive::<types::Int64Type>()
5351                .iter()
5352                .collect::<Vec<_>>(),
5353            vec![Some(0), Some(1), Some(2)]
5354        );
5355    }
5356
5357    #[test]
5358    fn test_read_only_row_numbers() {
5359        let file = write_parquet_from_iter(vec![(
5360            "value",
5361            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5362        )]);
5363        let row_number_field = Arc::new(
5364            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5365        );
5366        let options = ArrowReaderOptions::new()
5367            .with_virtual_columns(vec![row_number_field.clone()])
5368            .unwrap();
5369        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5370        let num_columns = metadata
5371            .metadata
5372            .file_metadata()
5373            .schema_descr()
5374            .num_columns();
5375
5376        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5377            .with_projection(ProjectionMask::none(num_columns))
5378            .build()
5379            .expect("reader with schema");
5380
5381        let batch = arrow_reader.next().unwrap().unwrap();
5382        let schema = Arc::new(Schema::new(vec![row_number_field]));
5383
5384        assert_eq!(batch.schema(), schema);
5385        assert_eq!(batch.num_columns(), 1);
5386        assert_eq!(batch.num_rows(), 3);
5387        assert_eq!(
5388            batch
5389                .column(0)
5390                .as_primitive::<types::Int64Type>()
5391                .iter()
5392                .collect::<Vec<_>>(),
5393            vec![Some(0), Some(1), Some(2)]
5394        );
5395    }
5396
5397    #[test]
5398    fn test_read_row_numbers_row_group_order() -> Result<()> {
5399        // Make a parquet file with 100 rows split across 2 row groups
5400        let array = Int64Array::from_iter_values(5000..5100);
5401        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5402        let mut buffer = Vec::new();
5403        let options = WriterProperties::builder()
5404            .set_max_row_group_row_count(Some(50))
5405            .build();
5406        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5407        // write in 10 row batches as the size limits are enforced after each batch
5408        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5409            writer.write(&batch_chunk)?;
5410        }
5411        writer.close()?;
5412
5413        let row_number_field = Arc::new(
5414            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5415        );
5416
5417        let buffer = Bytes::from(buffer);
5418
5419        let options =
5420            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5421
5422        // read out with normal options
5423        let arrow_reader =
5424            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5425                .build()?;
5426
5427        assert_eq!(
5428            ValuesAndRowNumbers {
5429                values: (5000..5100).collect(),
5430                row_numbers: (0..100).collect()
5431            },
5432            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5433        );
5434
5435        // Now read, out of order row groups
5436        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5437            .with_row_groups(vec![1, 0])
5438            .build()?;
5439
5440        assert_eq!(
5441            ValuesAndRowNumbers {
5442                values: (5050..5100).chain(5000..5050).collect(),
5443                row_numbers: (50..100).chain(0..50).collect(),
5444            },
5445            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5446        );
5447
5448        Ok(())
5449    }
5450
5451    #[derive(Debug, PartialEq)]
5452    struct ValuesAndRowNumbers {
5453        values: Vec<i64>,
5454        row_numbers: Vec<i64>,
5455    }
5456    impl ValuesAndRowNumbers {
5457        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5458            let mut values = vec![];
5459            let mut row_numbers = vec![];
5460            for batch in reader {
5461                let batch = batch.expect("Could not read batch");
5462                values.extend(
5463                    batch
5464                        .column_by_name("col")
5465                        .expect("Could not get col column")
5466                        .as_primitive::<arrow::datatypes::Int64Type>()
5467                        .iter()
5468                        .map(|v| v.expect("Could not get value")),
5469                );
5470
5471                row_numbers.extend(
5472                    batch
5473                        .column_by_name("row_number")
5474                        .expect("Could not get row_number column")
5475                        .as_primitive::<arrow::datatypes::Int64Type>()
5476                        .iter()
5477                        .map(|v| v.expect("Could not get row number"))
5478                        .collect::<Vec<_>>(),
5479                );
5480            }
5481            Self {
5482                values,
5483                row_numbers,
5484            }
5485        }
5486    }
5487
5488    #[test]
5489    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5490        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5491        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5492        assert_eq!(
5493            ArrowReaderOptions::new()
5494                .with_virtual_columns(vec![regular_field])
5495                .unwrap_err()
5496                .to_string(),
5497            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5498        );
5499    }
5500
5501    #[test]
5502    fn test_row_numbers_with_multiple_row_groups() {
5503        test_row_numbers_with_multiple_row_groups_helper(
5504            false,
5505            |path, selection, _row_filter, batch_size| {
5506                let file = File::open(path).unwrap();
5507                let row_number_field = Arc::new(
5508                    Field::new("row_number", ArrowDataType::Int64, false)
5509                        .with_extension_type(RowNumber),
5510                );
5511                let options = ArrowReaderOptions::new()
5512                    .with_virtual_columns(vec![row_number_field])
5513                    .unwrap();
5514                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5515                    .unwrap()
5516                    .with_row_selection(selection)
5517                    .with_batch_size(batch_size)
5518                    .build()
5519                    .expect("Could not create reader");
5520                reader
5521                    .collect::<Result<Vec<_>, _>>()
5522                    .expect("Could not read")
5523            },
5524        );
5525    }
5526
5527    #[test]
5528    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5529        test_row_numbers_with_multiple_row_groups_helper(
5530            true,
5531            |path, selection, row_filter, batch_size| {
5532                let file = File::open(path).unwrap();
5533                let row_number_field = Arc::new(
5534                    Field::new("row_number", ArrowDataType::Int64, false)
5535                        .with_extension_type(RowNumber),
5536                );
5537                let options = ArrowReaderOptions::new()
5538                    .with_virtual_columns(vec![row_number_field])
5539                    .unwrap();
5540                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5541                    .unwrap()
5542                    .with_row_selection(selection)
5543                    .with_batch_size(batch_size)
5544                    .with_row_filter(row_filter.expect("No filter"))
5545                    .build()
5546                    .expect("Could not create reader");
5547                reader
5548                    .collect::<Result<Vec<_>, _>>()
5549                    .expect("Could not read")
5550            },
5551        );
5552    }
5553
5554    #[test]
5555    fn test_read_row_group_indices() {
5556        // create a parquet file with 3 row groups, 2 rows each
5557        let array1 = Int64Array::from(vec![1, 2]);
5558        let array2 = Int64Array::from(vec![3, 4]);
5559        let array3 = Int64Array::from(vec![5, 6]);
5560
5561        let batch1 =
5562            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5563        let batch2 =
5564            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5565        let batch3 =
5566            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5567
5568        let mut buffer = Vec::new();
5569        let options = WriterProperties::builder()
5570            .set_max_row_group_row_count(Some(2))
5571            .build();
5572        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5573        writer.write(&batch1).unwrap();
5574        writer.write(&batch2).unwrap();
5575        writer.write(&batch3).unwrap();
5576        writer.close().unwrap();
5577
5578        let file = Bytes::from(buffer);
5579        let row_group_index_field = Arc::new(
5580            Field::new("row_group_index", ArrowDataType::Int64, false)
5581                .with_extension_type(RowGroupIndex),
5582        );
5583
5584        let options = ArrowReaderOptions::new()
5585            .with_virtual_columns(vec![row_group_index_field.clone()])
5586            .unwrap();
5587        let mut arrow_reader =
5588            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5589                .expect("reader builder with virtual columns")
5590                .build()
5591                .expect("reader with virtual columns");
5592
5593        let batch = arrow_reader.next().unwrap().unwrap();
5594
5595        assert_eq!(batch.num_columns(), 2);
5596        assert_eq!(batch.num_rows(), 6);
5597
5598        assert_eq!(
5599            batch
5600                .column(0)
5601                .as_primitive::<types::Int64Type>()
5602                .iter()
5603                .collect::<Vec<_>>(),
5604            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5605        );
5606
5607        assert_eq!(
5608            batch
5609                .column(1)
5610                .as_primitive::<types::Int64Type>()
5611                .iter()
5612                .collect::<Vec<_>>(),
5613            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5614        );
5615    }
5616
5617    #[test]
5618    fn test_read_only_row_group_indices() {
5619        let array1 = Int64Array::from(vec![1, 2, 3]);
5620        let array2 = Int64Array::from(vec![4, 5]);
5621
5622        let batch1 =
5623            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5624        let batch2 =
5625            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5626
5627        let mut buffer = Vec::new();
5628        let options = WriterProperties::builder()
5629            .set_max_row_group_row_count(Some(3))
5630            .build();
5631        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5632        writer.write(&batch1).unwrap();
5633        writer.write(&batch2).unwrap();
5634        writer.close().unwrap();
5635
5636        let file = Bytes::from(buffer);
5637        let row_group_index_field = Arc::new(
5638            Field::new("row_group_index", ArrowDataType::Int64, false)
5639                .with_extension_type(RowGroupIndex),
5640        );
5641
5642        let options = ArrowReaderOptions::new()
5643            .with_virtual_columns(vec![row_group_index_field.clone()])
5644            .unwrap();
5645        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5646        let num_columns = metadata
5647            .metadata
5648            .file_metadata()
5649            .schema_descr()
5650            .num_columns();
5651
5652        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5653            .with_projection(ProjectionMask::none(num_columns))
5654            .build()
5655            .expect("reader with virtual columns only");
5656
5657        let batch = arrow_reader.next().unwrap().unwrap();
5658        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5659
5660        assert_eq!(batch.schema(), schema);
5661        assert_eq!(batch.num_columns(), 1);
5662        assert_eq!(batch.num_rows(), 5);
5663
5664        assert_eq!(
5665            batch
5666                .column(0)
5667                .as_primitive::<types::Int64Type>()
5668                .iter()
5669                .collect::<Vec<_>>(),
5670            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5671        );
5672    }
5673
5674    #[test]
5675    fn test_read_row_group_indices_with_selection() -> Result<()> {
5676        let mut buffer = Vec::new();
5677        let options = WriterProperties::builder()
5678            .set_max_row_group_row_count(Some(10))
5679            .build();
5680
5681        let schema = Arc::new(Schema::new(vec![Field::new(
5682            "value",
5683            ArrowDataType::Int64,
5684            false,
5685        )]));
5686
5687        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5688
5689        // write out 3 batches of 10 rows each
5690        for i in 0..3 {
5691            let start = i * 10;
5692            let array = Int64Array::from_iter_values(start..start + 10);
5693            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5694            writer.write(&batch)?;
5695        }
5696        writer.close()?;
5697
5698        let file = Bytes::from(buffer);
5699        let row_group_index_field = Arc::new(
5700            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5701        );
5702
5703        let options =
5704            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5705
5706        // test row groups are read in reverse order
5707        let arrow_reader =
5708            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5709                .with_row_groups(vec![2, 1, 0])
5710                .build()?;
5711
5712        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5713        let combined = concat_batches(&batches[0].schema(), &batches)?;
5714
5715        let values = combined.column(0).as_primitive::<types::Int64Type>();
5716        let first_val = values.value(0);
5717        let last_val = values.value(combined.num_rows() - 1);
5718        // first row from rg 2
5719        assert_eq!(first_val, 20);
5720        // the last row from rg 0
5721        assert_eq!(last_val, 9);
5722
5723        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5724        assert_eq!(rg_indices.value(0), 2);
5725        assert_eq!(rg_indices.value(10), 1);
5726        assert_eq!(rg_indices.value(20), 0);
5727
5728        Ok(())
5729    }
5730
5731    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5732        use_filter: bool,
5733        test_case: F,
5734    ) where
5735        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5736    {
5737        let seed: u64 = random();
5738        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5739        let mut rng = StdRng::seed_from_u64(seed);
5740
5741        use tempfile::TempDir;
5742        let tempdir = TempDir::new().expect("Could not create temp dir");
5743
5744        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5745
5746        let path = tempdir.path().join("test.parquet");
5747        std::fs::write(&path, bytes).expect("Could not write file");
5748
5749        let mut case = vec![];
5750        let mut remaining = metadata.file_metadata().num_rows();
5751        while remaining > 0 {
5752            let row_count = rng.random_range(1..=remaining);
5753            remaining -= row_count;
5754            case.push(RowSelector {
5755                row_count: row_count as usize,
5756                skip: rng.random_bool(0.5),
5757            });
5758        }
5759
5760        let filter = use_filter.then(|| {
5761            let filter = (0..metadata.file_metadata().num_rows())
5762                .map(|_| rng.random_bool(0.99))
5763                .collect::<Vec<_>>();
5764            let mut filter_offset = 0;
5765            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5766                ProjectionMask::all(),
5767                move |b| {
5768                    let array = BooleanArray::from_iter(
5769                        filter
5770                            .iter()
5771                            .skip(filter_offset)
5772                            .take(b.num_rows())
5773                            .map(|x| Some(*x)),
5774                    );
5775                    filter_offset += b.num_rows();
5776                    Ok(array)
5777                },
5778            ))])
5779        });
5780
5781        let selection = RowSelection::from(case);
5782        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5783
5784        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5785            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5786            return;
5787        }
5788        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5789            .expect("Failed to concatenate");
5790        // assert_eq!(selection.row_count(), actual.num_rows());
5791        let values = actual
5792            .column(0)
5793            .as_primitive::<types::Int64Type>()
5794            .iter()
5795            .collect::<Vec<_>>();
5796        let row_numbers = actual
5797            .column(1)
5798            .as_primitive::<types::Int64Type>()
5799            .iter()
5800            .collect::<Vec<_>>();
5801        assert_eq!(
5802            row_numbers
5803                .into_iter()
5804                .map(|number| number.map(|number| number + 1))
5805                .collect::<Vec<_>>(),
5806            values
5807        );
5808    }
5809
5810    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5811        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5812            "value",
5813            ArrowDataType::Int64,
5814            false,
5815        )])));
5816
5817        let mut buf = Vec::with_capacity(1024);
5818        let mut writer =
5819            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5820
5821        let mut values = 1..=rng.random_range(1..4096);
5822        while !values.is_empty() {
5823            let batch_values = values
5824                .by_ref()
5825                .take(rng.random_range(1..4096))
5826                .collect::<Vec<_>>();
5827            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5828            let batch =
5829                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5830            writer.write(&batch).expect("Could not write batch");
5831            writer.flush().expect("Could not flush");
5832        }
5833        let metadata = writer.close().expect("Could not close writer");
5834
5835        (Bytes::from(buf), metadata)
5836    }
5837}