Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    ///
308    /// See the [blog post on late materialization] for a more technical explanation.
309    ///
310    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
311    ///
312    /// # Example
313    /// ```rust
314    /// # use std::fs::File;
315    /// # use arrow_array::Int32Array;
316    /// # use parquet::arrow::ProjectionMask;
317    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
318    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
319    /// # let testdata = arrow::util::test_util::parquet_test_data();
320    /// # let path = format!("{testdata}/alltypes_plain.parquet");
321    /// # let file = File::open(&path)?;
322    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
323    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
324    /// // Create predicate that evaluates `int_col != 1`.
325    /// // `int_col` column has index 4 (zero based) in the schema
326    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
327    /// // Only the projection columns are passed to the predicate so
328    /// // int_col is column 0 in the predicate
329    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
330    ///     let int_col = batch.column(0);
331    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
332    /// });
333    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
334    /// // The filter will be invoked during the reading process
335    /// let reader = builder.with_row_filter(row_filter).build()?;
336    /// # for b in reader { let _ = b?; }
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn with_row_filter(self, filter: RowFilter) -> Self {
341        Self {
342            filter: Some(filter),
343            ..self
344        }
345    }
346
347    /// Provide a limit to the number of rows to be read
348    ///
349    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
350    /// allowing it to limit the final set of rows decoded after any pushed down predicates
351    ///
352    /// It is recommended to enable reading the page index if using this functionality, to allow
353    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
354    pub fn with_limit(self, limit: usize) -> Self {
355        Self {
356            limit: Some(limit),
357            ..self
358        }
359    }
360
361    /// Provide an offset to skip over the given number of rows
362    ///
363    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
364    /// allowing it to skip rows after any pushed down predicates
365    ///
366    /// It is recommended to enable reading the page index if using this functionality, to allow
367    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
368    pub fn with_offset(self, offset: usize) -> Self {
369        Self {
370            offset: Some(offset),
371            ..self
372        }
373    }
374
375    /// Specify metrics collection during reading
376    ///
377    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
378    /// clone of the provided metrics to the builder.
379    ///
380    /// For example:
381    ///
382    /// ```rust
383    /// # use std::sync::Arc;
384    /// # use bytes::Bytes;
385    /// # use arrow_array::{Int32Array, RecordBatch};
386    /// # use arrow_schema::{DataType, Field, Schema};
387    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
388    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
389    /// # use parquet::arrow::ArrowWriter;
390    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// # let file = Bytes::from(file);
397    /// // Create metrics object to pass into the reader
398    /// let metrics = ArrowReaderMetrics::enabled();
399    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
400    ///   // Configure the builder to use the metrics by passing a clone
401    ///   .with_metrics(metrics.clone())
402    ///   // Build the reader
403    ///   .build().unwrap();
404    /// // .. read data from the reader ..
405    ///
406    /// // check the metrics
407    /// assert!(metrics.records_read_from_inner().is_some());
408    /// ```
409    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410        Self { metrics, ..self }
411    }
412
413    /// Set the maximum size (per row group) of the predicate cache in bytes for
414    /// the async decoder.
415    ///
416    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
417    /// unlimited cache size.
418    ///
419    /// This cache is used to store decoded arrays that are used in
420    /// predicate evaluation ([`Self::with_row_filter`]).
421    ///
422    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
423    /// [this ticket] for more details and alternatives.
424    ///
425    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
426    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
427    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428        Self {
429            max_predicate_cache_size,
430            ..self
431        }
432    }
433}
434
435/// Options that control how [`ParquetMetaData`] is read when constructing
436/// an Arrow reader.
437///
438/// To use these options, pass them to one of the following methods:
439/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
440/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
441///
442/// For fine-grained control over metadata loading, use
443/// [`ArrowReaderMetadata::load`] to load metadata with these options,
444///
445/// See [`ArrowReaderBuilder`] for how to configure how the column data
446/// is then read from the file, including projection and filter pushdown
447///
448/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
449#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451    /// Should the reader strip any user defined metadata from the Arrow schema
452    skip_arrow_metadata: bool,
453    /// If provided, used as the schema hint when determining the Arrow schema,
454    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
455    ///
456    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
457    supplied_schema: Option<SchemaRef>,
458
459    pub(crate) column_index: PageIndexPolicy,
460    pub(crate) offset_index: PageIndexPolicy,
461
462    /// Options to control reading of Parquet metadata
463    metadata_options: ParquetMetaDataOptions,
464    /// If encryption is enabled, the file decryption properties can be provided
465    #[cfg(feature = "encryption")]
466    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468    virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472    /// Create a new [`ArrowReaderOptions`] with the default settings
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Skip decoding the embedded arrow metadata (defaults to `false`)
478    ///
479    /// Parquet files generated by some writers may contain embedded arrow
480    /// schema and metadata.
481    /// This may not be correct or compatible with your system,
482    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
483    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484        Self {
485            skip_arrow_metadata,
486            ..self
487        }
488    }
489
490    /// Provide a schema hint to use when reading the Parquet file.
491    ///
492    /// If provided, this schema takes precedence over any arrow schema embedded
493    /// in the metadata (see the [`arrow`] documentation for more details).
494    ///
495    /// If the provided schema is not compatible with the data stored in the
496    /// parquet file schema, an error will be returned when constructing the
497    /// builder.
498    ///
499    /// This option is only required if you want to explicitly control the
500    /// conversion of Parquet types to Arrow types, such as casting a column to
501    /// a different type. For example, if you wanted to read an Int64 in
502    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
503    ///
504    /// [`arrow`]: crate::arrow
505    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
506    ///
507    /// # Notes
508    ///
509    /// The provided schema must have the same number of columns as the parquet schema and
510    /// the column names must be the same.
511    ///
512    /// # Example
513    /// ```
514    /// # use std::sync::Arc;
515    /// # use bytes::Bytes;
516    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
517    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
518    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
519    /// # use parquet::arrow::ArrowWriter;
520    /// // Write data - schema is inferred from the data to be Int32
521    /// let mut file = Vec::new();
522    /// let batch = RecordBatch::try_from_iter(vec![
523    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
524    /// ]).unwrap();
525    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
526    /// writer.write(&batch).unwrap();
527    /// writer.close().unwrap();
528    /// let file = Bytes::from(file);
529    ///
530    /// // Read the file back.
531    /// // Supply a schema that interprets the Int32 column as a Timestamp.
532    /// let supplied_schema = Arc::new(Schema::new(vec![
533    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
534    /// ]));
535    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
536    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
537    ///     file.clone(),
538    ///     options
539    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
540    ///
541    /// // Create the reader and read the data using the supplied schema.
542    /// let mut reader = builder.build().unwrap();
543    /// let _batch = reader.next().unwrap().unwrap();
544    /// ```
545    ///
546    /// # Example: Preserving Dictionary Encoding
547    ///
548    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
549    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
550    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
551    ///
552    /// ```
553    /// # use std::sync::Arc;
554    /// # use bytes::Bytes;
555    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
556    /// # use arrow_schema::{DataType, Field, Schema};
557    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
558    /// # use parquet::arrow::ArrowWriter;
559    /// // Write a Parquet file with string data
560    /// let mut file = Vec::new();
561    /// let schema = Arc::new(Schema::new(vec![
562    ///     Field::new("city", DataType::Utf8, false)
563    /// ]));
564    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
565    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
566    ///
567    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
568    /// writer.write(&batch).unwrap();
569    /// writer.close().unwrap();
570    /// let file = Bytes::from(file);
571    ///
572    /// // Read the file back, requesting dictionary encoding preservation
573    /// let dict_schema = Arc::new(Schema::new(vec![
574    ///     Field::new("city", DataType::Dictionary(
575    ///         Box::new(DataType::Int32),
576    ///         Box::new(DataType::Utf8)
577    ///     ), false)
578    /// ]));
579    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
580    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
581    ///     file.clone(),
582    ///     options
583    /// ).unwrap();
584    ///
585    /// let mut reader = builder.build().unwrap();
586    /// let batch = reader.next().unwrap().unwrap();
587    ///
588    /// // The column is now a DictionaryArray
589    /// assert!(matches!(
590    ///     batch.column(0).data_type(),
591    ///     DataType::Dictionary(_, _)
592    /// ));
593    /// ```
594    ///
595    /// **Note**: Dictionary encoding preservation works best when:
596    /// 1. The original column was dictionary encoded (the default for string columns)
597    /// 2. There are a small number of distinct values
598    pub fn with_schema(self, schema: SchemaRef) -> Self {
599        Self {
600            supplied_schema: Some(schema),
601            skip_arrow_metadata: true,
602            ..self
603        }
604    }
605
606    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
608    ///
609    /// The `PageIndex` can be used to push down predicates to the parquet scan,
610    /// potentially eliminating unnecessary IO, by some query engines.
611    ///
612    /// If this is enabled, [`ParquetMetaData::column_index`] and
613    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
614    /// information is present in the file.
615    ///
616    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
617    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
618    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
619    pub fn with_page_index(self, page_index: bool) -> Self {
620        self.with_page_index_policy(PageIndexPolicy::from(page_index))
621    }
622
623    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
624    ///
625    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
626    /// This method sets the same policy for both. For fine-grained control, use
627    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
628    ///
629    /// See [`Self::with_page_index`] for more details on page indexes.
630    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631        self.with_column_index_policy(policy)
632            .with_offset_index_policy(policy)
633    }
634
635    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
636    ///
637    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
638    /// for predicate pushdown and page-level pruning.
639    ///
640    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
641    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642        self.column_index = policy;
643        self
644    }
645
646    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
647    ///
648    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
649    /// efficient page-level skipping and random access within column chunks.
650    ///
651    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
652    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653        self.offset_index = policy;
654        self
655    }
656
657    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
658    /// footer will be skipped.
659    ///
660    /// This can be used to avoid reparsing the schema from the file when it is
661    /// already known.
662    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663        self.metadata_options.set_schema(schema);
664        self
665    }
666
667    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
668    /// (defaults to `false`).
669    ///
670    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
671    /// might be desirable.
672    ///
673    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
674    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
675    /// [`encoding_stats`]:
676    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
677    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678        self.metadata_options.set_encoding_stats_as_mask(val);
679        self
680    }
681
682    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
683    ///
684    /// [`encoding_stats`]:
685    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
686    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687        self.metadata_options.set_encoding_stats_policy(policy);
688        self
689    }
690
691    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
692    ///
693    /// [`statistics`]:
694    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
695    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696        self.metadata_options.set_column_stats_policy(policy);
697        self
698    }
699
700    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
701    ///
702    /// [`size_statistics`]:
703    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
704    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705        self.metadata_options.set_size_stats_policy(policy);
706        self
707    }
708
709    /// Provide the file decryption properties to use when reading encrypted parquet files.
710    ///
711    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
712    #[cfg(feature = "encryption")]
713    pub fn with_file_decryption_properties(
714        self,
715        file_decryption_properties: Arc<FileDecryptionProperties>,
716    ) -> Self {
717        Self {
718            file_decryption_properties: Some(file_decryption_properties),
719            ..self
720        }
721    }
722
723    /// Include virtual columns in the output.
724    ///
725    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
726    ///
727    /// # Example
728    /// ```
729    /// # use std::sync::Arc;
730    /// # use bytes::Bytes;
731    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
732    /// # use arrow_schema::{DataType, Field, Schema};
733    /// # use parquet::arrow::{ArrowWriter, RowNumber};
734    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
735    /// #
736    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
737    /// // Create a simple record batch with some data
738    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
739    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
740    ///
741    /// // Write the batch to an in-memory buffer
742    /// let mut file = Vec::new();
743    /// let mut writer = ArrowWriter::try_new(
744    ///     &mut file,
745    ///     batch.schema(),
746    ///     None
747    /// )?;
748    /// writer.write(&batch)?;
749    /// writer.close()?;
750    /// let file = Bytes::from(file);
751    ///
752    /// // Create a virtual column for row numbers
753    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
754    ///     .with_extension_type(RowNumber));
755    ///
756    /// // Configure options with virtual columns
757    /// let options = ArrowReaderOptions::new()
758    ///     .with_virtual_columns(vec![row_number_field])?;
759    ///
760    /// // Create a reader with the options
761    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
762    ///     file,
763    ///     options
764    /// )?
765    /// .build()?;
766    ///
767    /// // Read the batch - it will include both the original column and the virtual row_number column
768    /// let result_batch = reader.next().unwrap()?;
769    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
770    /// assert_eq!(result_batch.num_rows(), 3);
771    /// #
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776        // Validate that all fields are virtual columns
777        for field in &virtual_columns {
778            if !is_virtual_column(field) {
779                return Err(ParquetError::General(format!(
780                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781                    field.name()
782                )));
783            }
784        }
785        Ok(Self {
786            virtual_columns,
787            ..self
788        })
789    }
790
791    #[deprecated(
792        since = "57.2.0",
793        note = "Use `column_index_policy` or `offset_index_policy` instead"
794    )]
795    /// Returns whether page index reading is enabled.
796    ///
797    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
798    ///
799    /// This can be set via [`with_page_index`][Self::with_page_index] or
800    /// [`with_page_index_policy`][Self::with_page_index_policy].
801    pub fn page_index(&self) -> bool {
802        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803    }
804
805    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
806    ///
807    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
808    /// or [`with_page_index_policy`][Self::with_page_index_policy].
809    pub fn offset_index_policy(&self) -> PageIndexPolicy {
810        self.offset_index
811    }
812
813    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
814    ///
815    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
816    /// or [`with_page_index_policy`][Self::with_page_index_policy].
817    pub fn column_index_policy(&self) -> PageIndexPolicy {
818        self.column_index
819    }
820
821    /// Retrieve the currently set metadata decoding options.
822    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823        &self.metadata_options
824    }
825
826    /// Retrieve the currently set file decryption properties.
827    ///
828    /// This can be set via
829    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
830    #[cfg(feature = "encryption")]
831    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832        self.file_decryption_properties.as_ref()
833    }
834}
835
836/// The metadata necessary to construct a [`ArrowReaderBuilder`]
837///
838/// Note this structure is cheaply clone-able as it consists of several arcs.
839///
840/// This structure allows
841///
842/// 1. Loading metadata for a file once and then using that same metadata to
843///    construct multiple separate readers, for example, to distribute readers
844///    across multiple threads
845///
846/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
847///    from the file each time a reader is constructed.
848///
849/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
850#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852    /// The Parquet Metadata, if known aprior
853    pub(crate) metadata: Arc<ParquetMetaData>,
854    /// The Arrow Schema
855    pub(crate) schema: SchemaRef,
856    /// The Parquet schema (root field)
857    pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
862    /// and [`ChunkReader`]
863    ///
864    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
865    /// example of how this can be used
866    ///
867    /// # Notes
868    ///
869    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
870    /// `Self::metadata` is missing the page index, this function will attempt
871    /// to load the page index by making an object store request.
872    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873        let metadata = ParquetMetaDataReader::new()
874            .with_column_index_policy(options.column_index)
875            .with_offset_index_policy(options.offset_index)
876            .with_metadata_options(Some(options.metadata_options.clone()));
877        #[cfg(feature = "encryption")]
878        let metadata = metadata.with_decryption_properties(
879            options.file_decryption_properties.as_ref().map(Arc::clone),
880        );
881        let metadata = metadata.parse_and_finish(reader)?;
882        Self::try_new(Arc::new(metadata), options)
883    }
884
885    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
886    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
887    ///
888    /// # Notes
889    ///
890    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
891    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
892    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893        match options.supplied_schema {
894            Some(supplied_schema) => Self::with_supplied_schema(
895                metadata,
896                supplied_schema.clone(),
897                &options.virtual_columns,
898            ),
899            None => {
900                let kv_metadata = match options.skip_arrow_metadata {
901                    true => None,
902                    false => metadata.file_metadata().key_value_metadata(),
903                };
904
905                let (schema, fields) = parquet_to_arrow_schema_and_fields(
906                    metadata.file_metadata().schema_descr(),
907                    ProjectionMask::all(),
908                    kv_metadata,
909                    &options.virtual_columns,
910                )?;
911
912                Ok(Self {
913                    metadata,
914                    schema: Arc::new(schema),
915                    fields: fields.map(Arc::new),
916                })
917            }
918        }
919    }
920
921    fn with_supplied_schema(
922        metadata: Arc<ParquetMetaData>,
923        supplied_schema: SchemaRef,
924        virtual_columns: &[FieldRef],
925    ) -> Result<Self> {
926        let parquet_schema = metadata.file_metadata().schema_descr();
927        let field_levels = parquet_to_arrow_field_levels_with_virtual(
928            parquet_schema,
929            ProjectionMask::all(),
930            Some(supplied_schema.fields()),
931            virtual_columns,
932        )?;
933        let fields = field_levels.fields;
934        let inferred_len = fields.len();
935        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936        // Ensure the supplied schema has the same number of columns as the parquet schema.
937        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
938        // different lengths, but we check here to be safe.
939        if inferred_len != supplied_len {
940            return Err(arrow_err!(format!(
941                "Incompatible supplied Arrow schema: expected {} columns received {}",
942                inferred_len, supplied_len
943            )));
944        }
945
946        let mut errors = Vec::new();
947
948        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950        for (field1, field2) in field_iter {
951            if field1.data_type() != field2.data_type() {
952                errors.push(format!(
953                    "data type mismatch for field {}: requested {} but found {}",
954                    field1.name(),
955                    field1.data_type(),
956                    field2.data_type()
957                ));
958            }
959            if field1.is_nullable() != field2.is_nullable() {
960                errors.push(format!(
961                    "nullability mismatch for field {}: expected {:?} but found {:?}",
962                    field1.name(),
963                    field1.is_nullable(),
964                    field2.is_nullable()
965                ));
966            }
967            if field1.metadata() != field2.metadata() {
968                errors.push(format!(
969                    "metadata mismatch for field {}: expected {:?} but found {:?}",
970                    field1.name(),
971                    field1.metadata(),
972                    field2.metadata()
973                ));
974            }
975        }
976
977        if !errors.is_empty() {
978            let message = errors.join(", ");
979            return Err(ParquetError::ArrowError(format!(
980                "Incompatible supplied Arrow schema: {message}",
981            )));
982        }
983
984        Ok(Self {
985            metadata,
986            schema: supplied_schema,
987            fields: field_levels.levels.map(Arc::new),
988        })
989    }
990
991    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
992    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993        &self.metadata
994    }
995
996    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
997    pub fn parquet_schema(&self) -> &SchemaDescriptor {
998        self.metadata.file_metadata().schema_descr()
999    }
1000
1001    /// Returns the arrow [`SchemaRef`] for this parquet file
1002    pub fn schema(&self) -> &SchemaRef {
1003        &self.schema
1004    }
1005}
1006
1007#[doc(hidden)]
1008// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1009pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013        f.debug_tuple("SyncReader").field(&self.0).finish()
1014    }
1015}
1016
1017/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1018///
1019/// # See Also
1020/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1021/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1022/// * [`ArrowReaderBuilder`] for additional member functions
1023pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1027    ///
1028    /// ```
1029    /// # use std::sync::Arc;
1030    /// # use bytes::Bytes;
1031    /// # use arrow_array::{Int32Array, RecordBatch};
1032    /// # use arrow_schema::{DataType, Field, Schema};
1033    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1034    /// # use parquet::arrow::ArrowWriter;
1035    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1036    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1037    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1038    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1039    /// # writer.write(&batch).unwrap();
1040    /// # writer.close().unwrap();
1041    /// # let file = Bytes::from(file);
1042    /// // Build the reader from anything that implements `ChunkReader`
1043    /// // such as a `File`, or `Bytes`
1044    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045    /// // The builder has access to ParquetMetaData such
1046    /// // as the number and layout of row groups
1047    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1048    /// // Call build to create the reader
1049    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1050    /// // Read data
1051    /// while let Some(batch) = reader.next().transpose()? {
1052    ///     println!("Read {} rows", batch.num_rows());
1053    /// }
1054    /// # Ok::<(), parquet::errors::ParquetError>(())
1055    /// ```
1056    pub fn try_new(reader: T) -> Result<Self> {
1057        Self::try_new_with_options(reader, Default::default())
1058    }
1059
1060    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1061    ///
1062    /// Use this method if you want to control the options for reading the
1063    /// [`ParquetMetaData`]
1064    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066        Ok(Self::new_with_metadata(reader, metadata))
1067    }
1068
1069    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1070    ///
1071    /// Use this method if you already have [`ParquetMetaData`] for a file.
1072    /// This interface allows:
1073    ///
1074    /// 1. Loading metadata once and using it to create multiple builders with
1075    ///    potentially different settings or run on different threads
1076    ///
1077    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1078    ///    file each time a reader is constructed.
1079    ///
1080    /// See the docs on [`ArrowReaderMetadata`] for more details
1081    ///
1082    /// # Example
1083    /// ```
1084    /// # use std::fs::metadata;
1085    /// # use std::sync::Arc;
1086    /// # use bytes::Bytes;
1087    /// # use arrow_array::{Int32Array, RecordBatch};
1088    /// # use arrow_schema::{DataType, Field, Schema};
1089    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1090    /// # use parquet::arrow::ArrowWriter;
1091    /// #
1092    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1093    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1094    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1095    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1096    /// # writer.write(&batch).unwrap();
1097    /// # writer.close().unwrap();
1098    /// # let file = Bytes::from(file);
1099    /// #
1100    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1101    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1102    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1103    ///
1104    /// // Should be able to read from both in parallel
1105    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1106    /// ```
1107    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108        Self::new_builder(SyncReader(input), metadata)
1109    }
1110
1111    /// Read bloom filter for a column in a row group
1112    ///
1113    /// Returns `None` if the column does not have a bloom filter
1114    ///
1115    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1116    pub fn get_row_group_column_bloom_filter(
1117        &self,
1118        row_group_idx: usize,
1119        column_idx: usize,
1120    ) -> Result<Option<Sbbf>> {
1121        let metadata = self.metadata.row_group(row_group_idx);
1122        let column_metadata = metadata.column(column_idx);
1123
1124        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125            offset
1126                .try_into()
1127                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128        } else {
1129            return Ok(None);
1130        };
1131
1132        let buffer = match column_metadata.bloom_filter_length() {
1133            Some(length) => self.input.0.get_bytes(offset, length as usize),
1134            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135        }?;
1136
1137        let (header, bitset_offset) =
1138            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140        match header.algorithm {
1141            BloomFilterAlgorithm::BLOCK => {
1142                // this match exists to future proof the singleton algorithm enum
1143            }
1144        }
1145        match header.compression {
1146            BloomFilterCompression::UNCOMPRESSED => {
1147                // this match exists to future proof the singleton compression enum
1148            }
1149        }
1150        match header.hash {
1151            BloomFilterHash::XXHASH => {
1152                // this match exists to future proof the singleton hash enum
1153            }
1154        }
1155
1156        let bitset = match column_metadata.bloom_filter_length() {
1157            Some(_) => buffer.slice(
1158                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159                    - TryInto::<usize>::try_into(offset).unwrap())..,
1160            ),
1161            None => {
1162                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163                    ParquetError::General("Bloom filter length is invalid".to_string())
1164                })?;
1165                self.input.0.get_bytes(bitset_offset, bitset_length)?
1166            }
1167        };
1168        Ok(Some(Sbbf::new(&bitset)))
1169    }
1170
1171    /// Build a [`ParquetRecordBatchReader`]
1172    ///
1173    /// Note: this will eagerly evaluate any `RowFilter` before returning
1174    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175        let Self {
1176            input,
1177            metadata,
1178            schema: _,
1179            fields,
1180            batch_size,
1181            row_groups,
1182            projection,
1183            mut filter,
1184            selection,
1185            row_selection_policy,
1186            limit,
1187            offset,
1188            metrics,
1189            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1190            max_predicate_cache_size: _,
1191        } = self;
1192
1193        // Try to avoid allocate large buffer
1194        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198        let reader = ReaderRowGroups {
1199            reader: Arc::new(input.0),
1200            metadata,
1201            row_groups,
1202        };
1203
1204        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205            .with_selection(selection)
1206            .with_row_selection_policy(row_selection_policy);
1207
1208        // Update selection based on any filters
1209        if let Some(filter) = filter.as_mut() {
1210            for predicate in filter.predicates.iter_mut() {
1211                // break early if we have ruled out all rows
1212                if !plan_builder.selects_any() {
1213                    break;
1214                }
1215
1216                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217                    .with_parquet_metadata(&reader.metadata)
1218                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221            }
1222        }
1223
1224        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225            .with_parquet_metadata(&reader.metadata)
1226            .build_array_reader(fields.as_deref(), &projection)?;
1227
1228        let read_plan = plan_builder
1229            .limited(reader.num_rows())
1230            .with_offset(offset)
1231            .with_limit(limit)
1232            .build_limited()
1233            .build();
1234
1235        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236    }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240    reader: Arc<T>,
1241
1242    metadata: Arc<ParquetMetaData>,
1243    /// Optional list of row group indices to scan
1244    row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248    fn num_rows(&self) -> usize {
1249        let meta = self.metadata.row_groups();
1250        self.row_groups
1251            .iter()
1252            .map(|x| meta[*x].num_rows() as usize)
1253            .sum()
1254    }
1255
1256    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257        Ok(Box::new(ReaderPageIterator {
1258            column_idx: i,
1259            reader: self.reader.clone(),
1260            metadata: self.metadata.clone(),
1261            row_groups: self.row_groups.clone().into_iter(),
1262        }))
1263    }
1264
1265    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266        Box::new(
1267            self.row_groups
1268                .iter()
1269                .map(move |i| self.metadata.row_group(*i)),
1270        )
1271    }
1272
1273    fn metadata(&self) -> &ParquetMetaData {
1274        self.metadata.as_ref()
1275    }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279    reader: Arc<T>,
1280    column_idx: usize,
1281    row_groups: std::vec::IntoIter<usize>,
1282    metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286    /// Return the next SerializedPageReader
1287    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288        let rg = self.metadata.row_group(rg_idx);
1289        let column_chunk_metadata = rg.column(self.column_idx);
1290        let offset_index = self.metadata.offset_index();
1291        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1292        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1293        let page_locations = offset_index
1294            .filter(|i| !i[rg_idx].is_empty())
1295            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296        let total_rows = rg.num_rows() as usize;
1297        let reader = self.reader.clone();
1298
1299        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300            .add_crypto_context(
1301                rg_idx,
1302                self.column_idx,
1303                self.metadata.as_ref(),
1304                column_chunk_metadata,
1305            )
1306    }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310    type Item = Result<Box<dyn PageReader>>;
1311
1312    fn next(&mut self) -> Option<Self::Item> {
1313        let rg_idx = self.row_groups.next()?;
1314        let page_reader = self
1315            .next_page_reader(rg_idx)
1316            .map(|page_reader| Box::new(page_reader) as _);
1317        Some(page_reader)
1318    }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323/// Reads Parquet data as Arrow [`RecordBatch`]es
1324///
1325/// This struct implements the [`RecordBatchReader`] trait and is an
1326/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1327///
1328/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1329///
1330/// Created by [`ParquetRecordBatchReaderBuilder`]
1331///
1332/// [`Bytes`]: bytes::Bytes
1333pub struct ParquetRecordBatchReader {
1334    array_reader: Box<dyn ArrayReader>,
1335    schema: SchemaRef,
1336    read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341        f.debug_struct("ParquetRecordBatchReader")
1342            .field("array_reader", &"...")
1343            .field("schema", &self.schema)
1344            .field("read_plan", &self.read_plan)
1345            .finish()
1346    }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350    type Item = Result<RecordBatch, ArrowError>;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        self.next_inner()
1354            .map_err(|arrow_err| arrow_err.into())
1355            .transpose()
1356    }
1357}
1358
1359impl ParquetRecordBatchReader {
1360    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1361    /// has reached the end of the file.
1362    ///
1363    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1364    /// simplify error handling with `?`
1365    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366        let mut read_records = 0;
1367        let batch_size = self.batch_size();
1368        if batch_size == 0 {
1369            return Ok(None);
1370        }
1371        match self.read_plan.row_selection_cursor_mut() {
1372            RowSelectionCursor::Mask(mask_cursor) => {
1373                // Stream the record batch reader using contiguous segments of the selection
1374                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1375                while !mask_cursor.is_empty() {
1376                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377                        return Ok(None);
1378                    };
1379
1380                    if mask_chunk.initial_skip > 0 {
1381                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382                        if skipped != mask_chunk.initial_skip {
1383                            return Err(general_err!(
1384                                "failed to skip rows, expected {}, got {}",
1385                                mask_chunk.initial_skip,
1386                                skipped
1387                            ));
1388                        }
1389                    }
1390
1391                    if mask_chunk.chunk_rows == 0 {
1392                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393                            return Ok(None);
1394                        }
1395                        continue;
1396                    }
1397
1398                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401                    if read == 0 {
1402                        return Err(general_err!(
1403                            "reached end of column while expecting {} rows",
1404                            mask_chunk.chunk_rows
1405                        ));
1406                    }
1407                    if read != mask_chunk.chunk_rows {
1408                        return Err(general_err!(
1409                            "insufficient rows read from array reader - expected {}, got {}",
1410                            mask_chunk.chunk_rows,
1411                            read
1412                        ));
1413                    }
1414
1415                    let array = self.array_reader.consume_batch()?;
1416                    // The column reader exposes the projection as a struct array; convert this
1417                    // into a record batch before applying the boolean filter mask.
1418                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1419                        ArrowError::ParquetError(
1420                            "Struct array reader should return struct array".to_string(),
1421                        )
1422                    })?;
1423
1424                    let filtered_batch =
1425                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428                        return Err(general_err!(
1429                            "filtered rows mismatch selection - expected {}, got {}",
1430                            mask_chunk.selected_rows,
1431                            filtered_batch.num_rows()
1432                        ));
1433                    }
1434
1435                    if filtered_batch.num_rows() == 0 {
1436                        continue;
1437                    }
1438
1439                    return Ok(Some(filtered_batch));
1440                }
1441            }
1442            RowSelectionCursor::Selectors(selectors_cursor) => {
1443                while read_records < batch_size && !selectors_cursor.is_empty() {
1444                    let front = selectors_cursor.next_selector();
1445                    if front.skip {
1446                        let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448                        if skipped != front.row_count {
1449                            return Err(general_err!(
1450                                "failed to skip rows, expected {}, got {}",
1451                                front.row_count,
1452                                skipped
1453                            ));
1454                        }
1455                        continue;
1456                    }
1457
1458                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1459                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1460                    if front.row_count == 0 {
1461                        continue;
1462                    }
1463
1464                    // try to read record
1465                    let need_read = batch_size - read_records;
1466                    let to_read = match front.row_count.checked_sub(need_read) {
1467                        Some(remaining) if remaining != 0 => {
1468                            // if page row count less than batch_size we must set batch size to page row count.
1469                            // add check avoid dead loop
1470                            selectors_cursor.return_selector(RowSelector::select(remaining));
1471                            need_read
1472                        }
1473                        _ => front.row_count,
1474                    };
1475                    match self.array_reader.read_records(to_read)? {
1476                        0 => break,
1477                        rec => read_records += rec,
1478                    };
1479                }
1480            }
1481            RowSelectionCursor::All => {
1482                self.array_reader.read_records(batch_size)?;
1483            }
1484        };
1485
1486        let array = self.array_reader.consume_batch()?;
1487        let struct_array = array.as_struct_opt().ok_or_else(|| {
1488            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489        })?;
1490
1491        Ok(if struct_array.len() > 0 {
1492            Some(RecordBatch::from(struct_array))
1493        } else {
1494            None
1495        })
1496    }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1501    ///
1502    /// Note that the schema metadata will be stripped here. See
1503    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1504    fn schema(&self) -> SchemaRef {
1505        self.schema.clone()
1506    }
1507}
1508
1509impl ParquetRecordBatchReader {
1510    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1511    ///
1512    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1513    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514        ParquetRecordBatchReaderBuilder::try_new(reader)?
1515            .with_batch_size(batch_size)
1516            .build()
1517    }
1518
1519    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1520    ///
1521    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1522    /// higher-level interface for reading parquet data from a file
1523    pub fn try_new_with_row_groups(
1524        levels: &FieldLevels,
1525        row_groups: &dyn RowGroups,
1526        batch_size: usize,
1527        selection: Option<RowSelection>,
1528    ) -> Result<Self> {
1529        // note metrics are not supported in this API
1530        let metrics = ArrowReaderMetrics::disabled();
1531        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532            .with_parquet_metadata(row_groups.metadata())
1533            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535        let read_plan = ReadPlanBuilder::new(batch_size)
1536            .with_selection(selection)
1537            .build();
1538
1539        Ok(Self {
1540            array_reader,
1541            schema: Arc::new(Schema::new(levels.fields.clone())),
1542            read_plan,
1543        })
1544    }
1545
1546    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1547    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1548    /// all rows will be returned
1549    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550        let schema = match array_reader.get_data_type() {
1551            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552            _ => unreachable!("Struct array reader's data type is not struct!"),
1553        };
1554
1555        Self {
1556            array_reader,
1557            schema: Arc::new(schema),
1558            read_plan,
1559        }
1560    }
1561
1562    #[inline(always)]
1563    pub(crate) fn batch_size(&self) -> usize {
1564        self.read_plan.batch_size()
1565    }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570    use std::cmp::min;
1571    use std::collections::{HashMap, VecDeque};
1572    use std::fmt::Formatter;
1573    use std::fs::File;
1574    use std::io::Seek;
1575    use std::path::PathBuf;
1576    use std::sync::Arc;
1577
1578    use rand::rngs::StdRng;
1579    use rand::{Rng, RngCore, SeedableRng, random, rng};
1580    use tempfile::tempfile;
1581
1582    use crate::arrow::arrow_reader::{
1583        ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1584        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1585    };
1586    use crate::arrow::schema::{
1587        add_encoded_arrow_schema_to_metadata,
1588        virtual_type::{RowGroupIndex, RowNumber},
1589    };
1590    use crate::arrow::{ArrowWriter, ProjectionMask};
1591    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1592    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1593    use crate::data_type::{
1594        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1595        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1596    };
1597    use crate::errors::Result;
1598    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1599    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1600    use crate::file::writer::SerializedFileWriter;
1601    use crate::schema::parser::parse_message_type;
1602    use crate::schema::types::{Type, TypePtr};
1603    use crate::util::test_common::rand_gen::RandGen;
1604    use arrow_array::builder::*;
1605    use arrow_array::cast::AsArray;
1606    use arrow_array::types::{
1607        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1608        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1609        Time64MicrosecondType,
1610    };
1611    use arrow_array::*;
1612    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1613    use arrow_data::{ArrayData, ArrayDataBuilder};
1614    use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1615    use arrow_select::concat::concat_batches;
1616    use bytes::Bytes;
1617    use half::f16;
1618    use num_traits::PrimInt;
1619
1620    #[test]
1621    fn test_arrow_reader_all_columns() {
1622        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1623
1624        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1625        let original_schema = Arc::clone(builder.schema());
1626        let reader = builder.build().unwrap();
1627
1628        // Verify that the schema was correctly parsed
1629        assert_eq!(original_schema.fields(), reader.schema().fields());
1630    }
1631
1632    #[test]
1633    fn test_reuse_schema() {
1634        let file = get_test_file("parquet/alltypes-java.parquet");
1635
1636        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1637        let expected = builder.metadata;
1638        let schema = expected.file_metadata().schema_descr_ptr();
1639
1640        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1641        let builder =
1642            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1643
1644        // Verify that the metadata matches
1645        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1646    }
1647
1648    #[test]
1649    fn test_page_encoding_stats_mask() {
1650        let testdata = arrow::util::test_util::parquet_test_data();
1651        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1652        let file = File::open(path).unwrap();
1653
1654        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1655        let builder =
1656            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1657
1658        let row_group_metadata = builder.metadata.row_group(0);
1659
1660        // test page encoding stats
1661        let page_encoding_stats = row_group_metadata
1662            .column(0)
1663            .page_encoding_stats_mask()
1664            .unwrap();
1665        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1666        let page_encoding_stats = row_group_metadata
1667            .column(2)
1668            .page_encoding_stats_mask()
1669            .unwrap();
1670        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1671    }
1672
1673    #[test]
1674    fn test_stats_stats_skipped() {
1675        let testdata = arrow::util::test_util::parquet_test_data();
1676        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1677        let file = File::open(path).unwrap();
1678
1679        // test skipping all
1680        let arrow_options = ArrowReaderOptions::new()
1681            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1682            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1683        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1684            file.try_clone().unwrap(),
1685            arrow_options,
1686        )
1687        .unwrap();
1688
1689        let row_group_metadata = builder.metadata.row_group(0);
1690        for column in row_group_metadata.columns() {
1691            assert!(column.page_encoding_stats().is_none());
1692            assert!(column.page_encoding_stats_mask().is_none());
1693            assert!(column.statistics().is_none());
1694        }
1695
1696        // test skipping all but one column and converting to mask
1697        let arrow_options = ArrowReaderOptions::new()
1698            .with_encoding_stats_as_mask(true)
1699            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1700            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1701        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1702            file.try_clone().unwrap(),
1703            arrow_options,
1704        )
1705        .unwrap();
1706
1707        let row_group_metadata = builder.metadata.row_group(0);
1708        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1709            assert!(column.page_encoding_stats().is_none());
1710            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1711            assert_eq!(column.statistics().is_some(), idx == 0);
1712        }
1713    }
1714
1715    #[test]
1716    fn test_size_stats_stats_skipped() {
1717        let testdata = arrow::util::test_util::parquet_test_data();
1718        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1719        let file = File::open(path).unwrap();
1720
1721        // test skipping all
1722        let arrow_options =
1723            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1724        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1725            file.try_clone().unwrap(),
1726            arrow_options,
1727        )
1728        .unwrap();
1729
1730        let row_group_metadata = builder.metadata.row_group(0);
1731        for column in row_group_metadata.columns() {
1732            assert!(column.repetition_level_histogram().is_none());
1733            assert!(column.definition_level_histogram().is_none());
1734            assert!(column.unencoded_byte_array_data_bytes().is_none());
1735        }
1736
1737        // test skipping all but one column and converting to mask
1738        let arrow_options = ArrowReaderOptions::new()
1739            .with_encoding_stats_as_mask(true)
1740            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1741        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1742            file.try_clone().unwrap(),
1743            arrow_options,
1744        )
1745        .unwrap();
1746
1747        let row_group_metadata = builder.metadata.row_group(0);
1748        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1749            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1750            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1751            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1752        }
1753    }
1754
1755    #[test]
1756    fn test_arrow_reader_single_column() {
1757        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1758
1759        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1760        let original_schema = Arc::clone(builder.schema());
1761
1762        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1763        let reader = builder.with_projection(mask).build().unwrap();
1764
1765        // Verify that the schema was correctly parsed
1766        assert_eq!(1, reader.schema().fields().len());
1767        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1768    }
1769
1770    #[test]
1771    fn test_arrow_reader_single_column_by_name() {
1772        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1773
1774        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1775        let original_schema = Arc::clone(builder.schema());
1776
1777        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1778        let reader = builder.with_projection(mask).build().unwrap();
1779
1780        // Verify that the schema was correctly parsed
1781        assert_eq!(1, reader.schema().fields().len());
1782        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1783    }
1784
1785    #[test]
1786    fn test_null_column_reader_test() {
1787        let mut file = tempfile::tempfile().unwrap();
1788
1789        let schema = "
1790            message message {
1791                OPTIONAL INT32 int32;
1792            }
1793        ";
1794        let schema = Arc::new(parse_message_type(schema).unwrap());
1795
1796        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1797        generate_single_column_file_with_data::<Int32Type>(
1798            &[vec![], vec![]],
1799            Some(&def_levels),
1800            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1801            schema,
1802            Some(Field::new("int32", ArrowDataType::Null, true)),
1803            &Default::default(),
1804        )
1805        .unwrap();
1806
1807        file.rewind().unwrap();
1808
1809        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1810        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1811
1812        assert_eq!(batches.len(), 4);
1813        for batch in &batches[0..3] {
1814            assert_eq!(batch.num_rows(), 2);
1815            assert_eq!(batch.num_columns(), 1);
1816            assert_eq!(batch.column(0).null_count(), 2);
1817        }
1818
1819        assert_eq!(batches[3].num_rows(), 1);
1820        assert_eq!(batches[3].num_columns(), 1);
1821        assert_eq!(batches[3].column(0).null_count(), 1);
1822    }
1823
1824    #[test]
1825    fn test_primitive_single_column_reader_test() {
1826        run_single_column_reader_tests::<BoolType, _, BoolType>(
1827            2,
1828            ConvertedType::NONE,
1829            None,
1830            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1831            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832        );
1833        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1834            2,
1835            ConvertedType::NONE,
1836            None,
1837            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1838            &[
1839                Encoding::PLAIN,
1840                Encoding::RLE_DICTIONARY,
1841                Encoding::DELTA_BINARY_PACKED,
1842                Encoding::BYTE_STREAM_SPLIT,
1843            ],
1844        );
1845        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1846            2,
1847            ConvertedType::NONE,
1848            None,
1849            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1850            &[
1851                Encoding::PLAIN,
1852                Encoding::RLE_DICTIONARY,
1853                Encoding::DELTA_BINARY_PACKED,
1854                Encoding::BYTE_STREAM_SPLIT,
1855            ],
1856        );
1857        run_single_column_reader_tests::<FloatType, _, FloatType>(
1858            2,
1859            ConvertedType::NONE,
1860            None,
1861            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1862            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1863        );
1864    }
1865
1866    #[test]
1867    fn test_unsigned_primitive_single_column_reader_test() {
1868        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1869            2,
1870            ConvertedType::UINT_32,
1871            Some(ArrowDataType::UInt32),
1872            |vals| {
1873                Arc::new(UInt32Array::from_iter(
1874                    vals.iter().map(|x| x.map(|x| x as u32)),
1875                ))
1876            },
1877            &[
1878                Encoding::PLAIN,
1879                Encoding::RLE_DICTIONARY,
1880                Encoding::DELTA_BINARY_PACKED,
1881            ],
1882        );
1883        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1884            2,
1885            ConvertedType::UINT_64,
1886            Some(ArrowDataType::UInt64),
1887            |vals| {
1888                Arc::new(UInt64Array::from_iter(
1889                    vals.iter().map(|x| x.map(|x| x as u64)),
1890                ))
1891            },
1892            &[
1893                Encoding::PLAIN,
1894                Encoding::RLE_DICTIONARY,
1895                Encoding::DELTA_BINARY_PACKED,
1896            ],
1897        );
1898    }
1899
1900    #[test]
1901    fn test_unsigned_roundtrip() {
1902        let schema = Arc::new(Schema::new(vec![
1903            Field::new("uint32", ArrowDataType::UInt32, true),
1904            Field::new("uint64", ArrowDataType::UInt64, true),
1905        ]));
1906
1907        let mut buf = Vec::with_capacity(1024);
1908        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1909
1910        let original = RecordBatch::try_new(
1911            schema,
1912            vec![
1913                Arc::new(UInt32Array::from_iter_values([
1914                    0,
1915                    i32::MAX as u32,
1916                    u32::MAX,
1917                ])),
1918                Arc::new(UInt64Array::from_iter_values([
1919                    0,
1920                    i64::MAX as u64,
1921                    u64::MAX,
1922                ])),
1923            ],
1924        )
1925        .unwrap();
1926
1927        writer.write(&original).unwrap();
1928        writer.close().unwrap();
1929
1930        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1931        let ret = reader.next().unwrap().unwrap();
1932        assert_eq!(ret, original);
1933
1934        // Check they can be downcast to the correct type
1935        ret.column(0)
1936            .as_any()
1937            .downcast_ref::<UInt32Array>()
1938            .unwrap();
1939
1940        ret.column(1)
1941            .as_any()
1942            .downcast_ref::<UInt64Array>()
1943            .unwrap();
1944    }
1945
1946    #[test]
1947    fn test_float16_roundtrip() -> Result<()> {
1948        let schema = Arc::new(Schema::new(vec![
1949            Field::new("float16", ArrowDataType::Float16, false),
1950            Field::new("float16-nullable", ArrowDataType::Float16, true),
1951        ]));
1952
1953        let mut buf = Vec::with_capacity(1024);
1954        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1955
1956        let original = RecordBatch::try_new(
1957            schema,
1958            vec![
1959                Arc::new(Float16Array::from_iter_values([
1960                    f16::EPSILON,
1961                    f16::MIN,
1962                    f16::MAX,
1963                    f16::NAN,
1964                    f16::INFINITY,
1965                    f16::NEG_INFINITY,
1966                    f16::ONE,
1967                    f16::NEG_ONE,
1968                    f16::ZERO,
1969                    f16::NEG_ZERO,
1970                    f16::E,
1971                    f16::PI,
1972                    f16::FRAC_1_PI,
1973                ])),
1974                Arc::new(Float16Array::from(vec![
1975                    None,
1976                    None,
1977                    None,
1978                    Some(f16::NAN),
1979                    Some(f16::INFINITY),
1980                    Some(f16::NEG_INFINITY),
1981                    None,
1982                    None,
1983                    None,
1984                    None,
1985                    None,
1986                    None,
1987                    Some(f16::FRAC_1_PI),
1988                ])),
1989            ],
1990        )?;
1991
1992        writer.write(&original)?;
1993        writer.close()?;
1994
1995        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1996        let ret = reader.next().unwrap()?;
1997        assert_eq!(ret, original);
1998
1999        // Ensure can be downcast to the correct type
2000        ret.column(0).as_primitive::<Float16Type>();
2001        ret.column(1).as_primitive::<Float16Type>();
2002
2003        Ok(())
2004    }
2005
2006    #[test]
2007    fn test_time_utc_roundtrip() -> Result<()> {
2008        let schema = Arc::new(Schema::new(vec![
2009            Field::new(
2010                "time_millis",
2011                ArrowDataType::Time32(TimeUnit::Millisecond),
2012                true,
2013            )
2014            .with_metadata(HashMap::from_iter(vec![(
2015                "adjusted_to_utc".to_string(),
2016                "".to_string(),
2017            )])),
2018            Field::new(
2019                "time_micros",
2020                ArrowDataType::Time64(TimeUnit::Microsecond),
2021                true,
2022            )
2023            .with_metadata(HashMap::from_iter(vec![(
2024                "adjusted_to_utc".to_string(),
2025                "".to_string(),
2026            )])),
2027        ]));
2028
2029        let mut buf = Vec::with_capacity(1024);
2030        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2031
2032        let original = RecordBatch::try_new(
2033            schema,
2034            vec![
2035                Arc::new(Time32MillisecondArray::from(vec![
2036                    Some(-1),
2037                    Some(0),
2038                    Some(86_399_000),
2039                    Some(86_400_000),
2040                    Some(86_401_000),
2041                    None,
2042                ])),
2043                Arc::new(Time64MicrosecondArray::from(vec![
2044                    Some(-1),
2045                    Some(0),
2046                    Some(86_399 * 1_000_000),
2047                    Some(86_400 * 1_000_000),
2048                    Some(86_401 * 1_000_000),
2049                    None,
2050                ])),
2051            ],
2052        )?;
2053
2054        writer.write(&original)?;
2055        writer.close()?;
2056
2057        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2058        let ret = reader.next().unwrap()?;
2059        assert_eq!(ret, original);
2060
2061        // Ensure can be downcast to the correct type
2062        ret.column(0).as_primitive::<Time32MillisecondType>();
2063        ret.column(1).as_primitive::<Time64MicrosecondType>();
2064
2065        Ok(())
2066    }
2067
2068    #[test]
2069    fn test_date32_roundtrip() -> Result<()> {
2070        use arrow_array::Date32Array;
2071
2072        let schema = Arc::new(Schema::new(vec![Field::new(
2073            "date32",
2074            ArrowDataType::Date32,
2075            false,
2076        )]));
2077
2078        let mut buf = Vec::with_capacity(1024);
2079
2080        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2081
2082        let original = RecordBatch::try_new(
2083            schema,
2084            vec![Arc::new(Date32Array::from(vec![
2085                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2086            ]))],
2087        )?;
2088
2089        writer.write(&original)?;
2090        writer.close()?;
2091
2092        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2093        let ret = reader.next().unwrap()?;
2094        assert_eq!(ret, original);
2095
2096        // Ensure can be downcast to the correct type
2097        ret.column(0).as_primitive::<Date32Type>();
2098
2099        Ok(())
2100    }
2101
2102    #[test]
2103    fn test_date64_roundtrip() -> Result<()> {
2104        use arrow_array::Date64Array;
2105
2106        let schema = Arc::new(Schema::new(vec![
2107            Field::new("small-date64", ArrowDataType::Date64, false),
2108            Field::new("big-date64", ArrowDataType::Date64, false),
2109            Field::new("invalid-date64", ArrowDataType::Date64, false),
2110        ]));
2111
2112        let mut default_buf = Vec::with_capacity(1024);
2113        let mut coerce_buf = Vec::with_capacity(1024);
2114
2115        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2116
2117        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2118        let mut coerce_writer =
2119            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2120
2121        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2122
2123        let original = RecordBatch::try_new(
2124            schema,
2125            vec![
2126                // small-date64
2127                Arc::new(Date64Array::from(vec![
2128                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2129                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2130                    0,
2131                    1_000 * NUM_MILLISECONDS_IN_DAY,
2132                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2133                ])),
2134                // big-date64
2135                Arc::new(Date64Array::from(vec![
2136                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2137                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2138                    0,
2139                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2140                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2141                ])),
2142                // invalid-date64
2143                Arc::new(Date64Array::from(vec![
2144                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2145                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2146                    1,
2147                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2148                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2149                ])),
2150            ],
2151        )?;
2152
2153        default_writer.write(&original)?;
2154        coerce_writer.write(&original)?;
2155
2156        default_writer.close()?;
2157        coerce_writer.close()?;
2158
2159        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2160        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2161
2162        let default_ret = default_reader.next().unwrap()?;
2163        let coerce_ret = coerce_reader.next().unwrap()?;
2164
2165        // Roundtrip should be successful when default writer used
2166        assert_eq!(default_ret, original);
2167
2168        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2169        assert_eq!(coerce_ret.column(0), original.column(0));
2170        assert_ne!(coerce_ret.column(1), original.column(1));
2171        assert_ne!(coerce_ret.column(2), original.column(2));
2172
2173        // Ensure both can be downcast to the correct type
2174        default_ret.column(0).as_primitive::<Date64Type>();
2175        coerce_ret.column(0).as_primitive::<Date64Type>();
2176
2177        Ok(())
2178    }
2179    struct RandFixedLenGen {}
2180
2181    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2182        fn r#gen(len: i32) -> FixedLenByteArray {
2183            let mut v = vec![0u8; len as usize];
2184            rng().fill_bytes(&mut v);
2185            ByteArray::from(v).into()
2186        }
2187    }
2188
2189    #[test]
2190    fn test_fixed_length_binary_column_reader() {
2191        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2192            20,
2193            ConvertedType::NONE,
2194            None,
2195            |vals| {
2196                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2197                for val in vals {
2198                    match val {
2199                        Some(b) => builder.append_value(b).unwrap(),
2200                        None => builder.append_null(),
2201                    }
2202                }
2203                Arc::new(builder.finish())
2204            },
2205            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2206        );
2207    }
2208
2209    #[test]
2210    fn test_interval_day_time_column_reader() {
2211        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2212            12,
2213            ConvertedType::INTERVAL,
2214            None,
2215            |vals| {
2216                Arc::new(
2217                    vals.iter()
2218                        .map(|x| {
2219                            x.as_ref().map(|b| IntervalDayTime {
2220                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2221                                milliseconds: i32::from_le_bytes(
2222                                    b.as_ref()[8..12].try_into().unwrap(),
2223                                ),
2224                            })
2225                        })
2226                        .collect::<IntervalDayTimeArray>(),
2227                )
2228            },
2229            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2230        );
2231    }
2232
2233    #[test]
2234    fn test_int96_single_column_reader_test() {
2235        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2236
2237        type TypeHintAndConversionFunction =
2238            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2239
2240        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2241            // Test without a specified ArrowType hint.
2242            (None, |vals: &[Option<Int96>]| {
2243                Arc::new(TimestampNanosecondArray::from_iter(
2244                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2245                )) as ArrayRef
2246            }),
2247            // Test other TimeUnits as ArrowType hints.
2248            (
2249                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2250                |vals: &[Option<Int96>]| {
2251                    Arc::new(TimestampSecondArray::from_iter(
2252                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2253                    )) as ArrayRef
2254                },
2255            ),
2256            (
2257                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2258                |vals: &[Option<Int96>]| {
2259                    Arc::new(TimestampMillisecondArray::from_iter(
2260                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2261                    )) as ArrayRef
2262                },
2263            ),
2264            (
2265                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2266                |vals: &[Option<Int96>]| {
2267                    Arc::new(TimestampMicrosecondArray::from_iter(
2268                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2269                    )) as ArrayRef
2270                },
2271            ),
2272            (
2273                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2274                |vals: &[Option<Int96>]| {
2275                    Arc::new(TimestampNanosecondArray::from_iter(
2276                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2277                    )) as ArrayRef
2278                },
2279            ),
2280            // Test another timezone with TimeUnit as ArrowType hints.
2281            (
2282                Some(ArrowDataType::Timestamp(
2283                    TimeUnit::Second,
2284                    Some(Arc::from("-05:00")),
2285                )),
2286                |vals: &[Option<Int96>]| {
2287                    Arc::new(
2288                        TimestampSecondArray::from_iter(
2289                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2290                        )
2291                        .with_timezone("-05:00"),
2292                    ) as ArrayRef
2293                },
2294            ),
2295        ];
2296
2297        resolutions.iter().for_each(|(arrow_type, converter)| {
2298            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2299                2,
2300                ConvertedType::NONE,
2301                arrow_type.clone(),
2302                converter,
2303                encodings,
2304            );
2305        })
2306    }
2307
2308    #[test]
2309    fn test_int96_from_spark_file_with_provided_schema() {
2310        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2311        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2312        // microsecond resolution for the Parquet reader to interpret these values correctly.
2313        use arrow_schema::DataType::Timestamp;
2314        let test_data = arrow::util::test_util::parquet_test_data();
2315        let path = format!("{test_data}/int96_from_spark.parquet");
2316        let file = File::open(path).unwrap();
2317
2318        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2319            "a",
2320            Timestamp(TimeUnit::Microsecond, None),
2321            true,
2322        )]));
2323        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2324
2325        let mut record_reader =
2326            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2327                .unwrap()
2328                .build()
2329                .unwrap();
2330
2331        let batch = record_reader.next().unwrap().unwrap();
2332        assert_eq!(batch.num_columns(), 1);
2333        let column = batch.column(0);
2334        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2335
2336        let expected = Arc::new(Int64Array::from(vec![
2337            Some(1704141296123456),
2338            Some(1704070800000000),
2339            Some(253402225200000000),
2340            Some(1735599600000000),
2341            None,
2342            Some(9089380393200000000),
2343        ]));
2344
2345        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2346        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2347        // anyway, so this should be a zero-copy non-modifying cast.
2348
2349        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2350        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2351
2352        assert_eq!(casted_timestamps.len(), expected.len());
2353
2354        casted_timestamps
2355            .iter()
2356            .zip(expected.iter())
2357            .for_each(|(lhs, rhs)| {
2358                assert_eq!(lhs, rhs);
2359            });
2360    }
2361
2362    #[test]
2363    fn test_int96_from_spark_file_without_provided_schema() {
2364        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2365        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2366        // values when read as nanosecond resolution overflow and result in garbage values.
2367        use arrow_schema::DataType::Timestamp;
2368        let test_data = arrow::util::test_util::parquet_test_data();
2369        let path = format!("{test_data}/int96_from_spark.parquet");
2370        let file = File::open(path).unwrap();
2371
2372        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2373            .unwrap()
2374            .build()
2375            .unwrap();
2376
2377        let batch = record_reader.next().unwrap().unwrap();
2378        assert_eq!(batch.num_columns(), 1);
2379        let column = batch.column(0);
2380        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2381
2382        let expected = Arc::new(Int64Array::from(vec![
2383            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2384            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2385            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2386            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2387            None,
2388            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2389        ]));
2390
2391        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2392        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2393        // anyway, so this should be a zero-copy non-modifying cast.
2394
2395        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2396        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2397
2398        assert_eq!(casted_timestamps.len(), expected.len());
2399
2400        casted_timestamps
2401            .iter()
2402            .zip(expected.iter())
2403            .for_each(|(lhs, rhs)| {
2404                assert_eq!(lhs, rhs);
2405            });
2406    }
2407
2408    struct RandUtf8Gen {}
2409
2410    impl RandGen<ByteArrayType> for RandUtf8Gen {
2411        fn r#gen(len: i32) -> ByteArray {
2412            Int32Type::r#gen(len).to_string().as_str().into()
2413        }
2414    }
2415
2416    #[test]
2417    fn test_utf8_single_column_reader_test() {
2418        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2419            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2420                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2421            })))
2422        }
2423
2424        let encodings = &[
2425            Encoding::PLAIN,
2426            Encoding::RLE_DICTIONARY,
2427            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2428            Encoding::DELTA_BYTE_ARRAY,
2429        ];
2430
2431        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2432            2,
2433            ConvertedType::NONE,
2434            None,
2435            |vals| {
2436                Arc::new(BinaryArray::from_iter(
2437                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2438                ))
2439            },
2440            encodings,
2441        );
2442
2443        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2444            2,
2445            ConvertedType::UTF8,
2446            None,
2447            string_converter::<i32>,
2448            encodings,
2449        );
2450
2451        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2452            2,
2453            ConvertedType::UTF8,
2454            Some(ArrowDataType::Utf8),
2455            string_converter::<i32>,
2456            encodings,
2457        );
2458
2459        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2460            2,
2461            ConvertedType::UTF8,
2462            Some(ArrowDataType::LargeUtf8),
2463            string_converter::<i64>,
2464            encodings,
2465        );
2466
2467        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2468        for key in &small_key_types {
2469            for encoding in encodings {
2470                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2471                opts.encoding = *encoding;
2472
2473                let data_type =
2474                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2475
2476                // Cannot run full test suite as keys overflow, run small test instead
2477                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2478                    opts,
2479                    2,
2480                    ConvertedType::UTF8,
2481                    Some(data_type.clone()),
2482                    move |vals| {
2483                        let vals = string_converter::<i32>(vals);
2484                        arrow::compute::cast(&vals, &data_type).unwrap()
2485                    },
2486                );
2487            }
2488        }
2489
2490        let key_types = [
2491            ArrowDataType::Int16,
2492            ArrowDataType::UInt16,
2493            ArrowDataType::Int32,
2494            ArrowDataType::UInt32,
2495            ArrowDataType::Int64,
2496            ArrowDataType::UInt64,
2497        ];
2498
2499        for key in &key_types {
2500            let data_type =
2501                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2502
2503            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2504                2,
2505                ConvertedType::UTF8,
2506                Some(data_type.clone()),
2507                move |vals| {
2508                    let vals = string_converter::<i32>(vals);
2509                    arrow::compute::cast(&vals, &data_type).unwrap()
2510                },
2511                encodings,
2512            );
2513
2514            let data_type = ArrowDataType::Dictionary(
2515                Box::new(key.clone()),
2516                Box::new(ArrowDataType::LargeUtf8),
2517            );
2518
2519            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2520                2,
2521                ConvertedType::UTF8,
2522                Some(data_type.clone()),
2523                move |vals| {
2524                    let vals = string_converter::<i64>(vals);
2525                    arrow::compute::cast(&vals, &data_type).unwrap()
2526                },
2527                encodings,
2528            );
2529        }
2530    }
2531
2532    #[test]
2533    fn test_decimal_nullable_struct() {
2534        let decimals = Decimal256Array::from_iter_values(
2535            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2536        );
2537
2538        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2539            "decimals",
2540            decimals.data_type().clone(),
2541            false,
2542        )])))
2543        .len(8)
2544        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2545        .child_data(vec![decimals.into_data()])
2546        .build()
2547        .unwrap();
2548
2549        let written =
2550            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2551                .unwrap();
2552
2553        let mut buffer = Vec::with_capacity(1024);
2554        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2555        writer.write(&written).unwrap();
2556        writer.close().unwrap();
2557
2558        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2559            .unwrap()
2560            .collect::<Result<Vec<_>, _>>()
2561            .unwrap();
2562
2563        assert_eq!(&written.slice(0, 3), &read[0]);
2564        assert_eq!(&written.slice(3, 3), &read[1]);
2565        assert_eq!(&written.slice(6, 2), &read[2]);
2566    }
2567
2568    #[test]
2569    fn test_int32_nullable_struct() {
2570        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2571        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2572            "int32",
2573            int32.data_type().clone(),
2574            false,
2575        )])))
2576        .len(8)
2577        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2578        .child_data(vec![int32.into_data()])
2579        .build()
2580        .unwrap();
2581
2582        let written =
2583            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2584                .unwrap();
2585
2586        let mut buffer = Vec::with_capacity(1024);
2587        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2588        writer.write(&written).unwrap();
2589        writer.close().unwrap();
2590
2591        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2592            .unwrap()
2593            .collect::<Result<Vec<_>, _>>()
2594            .unwrap();
2595
2596        assert_eq!(&written.slice(0, 3), &read[0]);
2597        assert_eq!(&written.slice(3, 3), &read[1]);
2598        assert_eq!(&written.slice(6, 2), &read[2]);
2599    }
2600
2601    #[test]
2602    fn test_decimal_list() {
2603        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2604
2605        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2606        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2607            decimals.data_type().clone(),
2608            false,
2609        ))))
2610        .len(7)
2611        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2612        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2613        .child_data(vec![decimals.into_data()])
2614        .build()
2615        .unwrap();
2616
2617        let written =
2618            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2619                .unwrap();
2620
2621        let mut buffer = Vec::with_capacity(1024);
2622        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2623        writer.write(&written).unwrap();
2624        writer.close().unwrap();
2625
2626        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2627            .unwrap()
2628            .collect::<Result<Vec<_>, _>>()
2629            .unwrap();
2630
2631        assert_eq!(&written.slice(0, 3), &read[0]);
2632        assert_eq!(&written.slice(3, 3), &read[1]);
2633        assert_eq!(&written.slice(6, 1), &read[2]);
2634    }
2635
2636    #[test]
2637    fn test_read_decimal_file() {
2638        use arrow_array::Decimal128Array;
2639        let testdata = arrow::util::test_util::parquet_test_data();
2640        let file_variants = vec![
2641            ("byte_array", 4),
2642            ("fixed_length", 25),
2643            ("int32", 4),
2644            ("int64", 10),
2645        ];
2646        for (prefix, target_precision) in file_variants {
2647            let path = format!("{testdata}/{prefix}_decimal.parquet");
2648            let file = File::open(path).unwrap();
2649            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2650
2651            let batch = record_reader.next().unwrap().unwrap();
2652            assert_eq!(batch.num_rows(), 24);
2653            let col = batch
2654                .column(0)
2655                .as_any()
2656                .downcast_ref::<Decimal128Array>()
2657                .unwrap();
2658
2659            let expected = 1..25;
2660
2661            assert_eq!(col.precision(), target_precision);
2662            assert_eq!(col.scale(), 2);
2663
2664            for (i, v) in expected.enumerate() {
2665                assert_eq!(col.value(i), v * 100_i128);
2666            }
2667        }
2668    }
2669
2670    #[test]
2671    fn test_read_float16_nonzeros_file() {
2672        use arrow_array::Float16Array;
2673        let testdata = arrow::util::test_util::parquet_test_data();
2674        // see https://github.com/apache/parquet-testing/pull/40
2675        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2676        let file = File::open(path).unwrap();
2677        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2678
2679        let batch = record_reader.next().unwrap().unwrap();
2680        assert_eq!(batch.num_rows(), 8);
2681        let col = batch
2682            .column(0)
2683            .as_any()
2684            .downcast_ref::<Float16Array>()
2685            .unwrap();
2686
2687        let f16_two = f16::ONE + f16::ONE;
2688
2689        assert_eq!(col.null_count(), 1);
2690        assert!(col.is_null(0));
2691        assert_eq!(col.value(1), f16::ONE);
2692        assert_eq!(col.value(2), -f16_two);
2693        assert!(col.value(3).is_nan());
2694        assert_eq!(col.value(4), f16::ZERO);
2695        assert!(col.value(4).is_sign_positive());
2696        assert_eq!(col.value(5), f16::NEG_ONE);
2697        assert_eq!(col.value(6), f16::NEG_ZERO);
2698        assert!(col.value(6).is_sign_negative());
2699        assert_eq!(col.value(7), f16_two);
2700    }
2701
2702    #[test]
2703    fn test_read_float16_zeros_file() {
2704        use arrow_array::Float16Array;
2705        let testdata = arrow::util::test_util::parquet_test_data();
2706        // see https://github.com/apache/parquet-testing/pull/40
2707        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2708        let file = File::open(path).unwrap();
2709        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2710
2711        let batch = record_reader.next().unwrap().unwrap();
2712        assert_eq!(batch.num_rows(), 3);
2713        let col = batch
2714            .column(0)
2715            .as_any()
2716            .downcast_ref::<Float16Array>()
2717            .unwrap();
2718
2719        assert_eq!(col.null_count(), 1);
2720        assert!(col.is_null(0));
2721        assert_eq!(col.value(1), f16::ZERO);
2722        assert!(col.value(1).is_sign_positive());
2723        assert!(col.value(2).is_nan());
2724    }
2725
2726    #[test]
2727    fn test_read_float32_float64_byte_stream_split() {
2728        let path = format!(
2729            "{}/byte_stream_split.zstd.parquet",
2730            arrow::util::test_util::parquet_test_data(),
2731        );
2732        let file = File::open(path).unwrap();
2733        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2734
2735        let mut row_count = 0;
2736        for batch in record_reader {
2737            let batch = batch.unwrap();
2738            row_count += batch.num_rows();
2739            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2740            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2741
2742            // This file contains floats from a standard normal distribution
2743            for &x in f32_col.values() {
2744                assert!(x > -10.0);
2745                assert!(x < 10.0);
2746            }
2747            for &x in f64_col.values() {
2748                assert!(x > -10.0);
2749                assert!(x < 10.0);
2750            }
2751        }
2752        assert_eq!(row_count, 300);
2753    }
2754
2755    #[test]
2756    fn test_read_extended_byte_stream_split() {
2757        let path = format!(
2758            "{}/byte_stream_split_extended.gzip.parquet",
2759            arrow::util::test_util::parquet_test_data(),
2760        );
2761        let file = File::open(path).unwrap();
2762        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2763
2764        let mut row_count = 0;
2765        for batch in record_reader {
2766            let batch = batch.unwrap();
2767            row_count += batch.num_rows();
2768
2769            // 0,1 are f16
2770            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2771            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2772            assert_eq!(f16_col.len(), f16_bss.len());
2773            f16_col
2774                .iter()
2775                .zip(f16_bss.iter())
2776                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2777
2778            // 2,3 are f32
2779            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2780            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2781            assert_eq!(f32_col.len(), f32_bss.len());
2782            f32_col
2783                .iter()
2784                .zip(f32_bss.iter())
2785                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2786
2787            // 4,5 are f64
2788            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2789            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2790            assert_eq!(f64_col.len(), f64_bss.len());
2791            f64_col
2792                .iter()
2793                .zip(f64_bss.iter())
2794                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2795
2796            // 6,7 are i32
2797            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2798            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2799            assert_eq!(i32_col.len(), i32_bss.len());
2800            i32_col
2801                .iter()
2802                .zip(i32_bss.iter())
2803                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2804
2805            // 8,9 are i64
2806            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2807            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2808            assert_eq!(i64_col.len(), i64_bss.len());
2809            i64_col
2810                .iter()
2811                .zip(i64_bss.iter())
2812                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2813
2814            // 10,11 are FLBA(5)
2815            let flba_col = batch.column(10).as_fixed_size_binary();
2816            let flba_bss = batch.column(11).as_fixed_size_binary();
2817            assert_eq!(flba_col.len(), flba_bss.len());
2818            flba_col
2819                .iter()
2820                .zip(flba_bss.iter())
2821                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2822
2823            // 12,13 are FLBA(4) (decimal(7,3))
2824            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2825            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2826            assert_eq!(dec_col.len(), dec_bss.len());
2827            dec_col
2828                .iter()
2829                .zip(dec_bss.iter())
2830                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2831        }
2832        assert_eq!(row_count, 200);
2833    }
2834
2835    #[test]
2836    fn test_read_incorrect_map_schema_file() {
2837        let testdata = arrow::util::test_util::parquet_test_data();
2838        // see https://github.com/apache/parquet-testing/pull/47
2839        let path = format!("{testdata}/incorrect_map_schema.parquet");
2840        let file = File::open(path).unwrap();
2841        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2842
2843        let batch = record_reader.next().unwrap().unwrap();
2844        assert_eq!(batch.num_rows(), 1);
2845
2846        let expected_schema = Schema::new(vec![Field::new(
2847            "my_map",
2848            ArrowDataType::Map(
2849                Arc::new(Field::new(
2850                    "key_value",
2851                    ArrowDataType::Struct(Fields::from(vec![
2852                        Field::new("key", ArrowDataType::Utf8, false),
2853                        Field::new("value", ArrowDataType::Utf8, true),
2854                    ])),
2855                    false,
2856                )),
2857                false,
2858            ),
2859            true,
2860        )]);
2861        assert_eq!(batch.schema().as_ref(), &expected_schema);
2862
2863        assert_eq!(batch.num_rows(), 1);
2864        assert_eq!(batch.column(0).null_count(), 0);
2865        assert_eq!(
2866            batch.column(0).as_map().keys().as_ref(),
2867            &StringArray::from(vec!["parent", "name"])
2868        );
2869        assert_eq!(
2870            batch.column(0).as_map().values().as_ref(),
2871            &StringArray::from(vec!["another", "report"])
2872        );
2873    }
2874
2875    #[test]
2876    fn test_read_dict_fixed_size_binary() {
2877        let schema = Arc::new(Schema::new(vec![Field::new(
2878            "a",
2879            ArrowDataType::Dictionary(
2880                Box::new(ArrowDataType::UInt8),
2881                Box::new(ArrowDataType::FixedSizeBinary(8)),
2882            ),
2883            true,
2884        )]));
2885        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2886        let values = FixedSizeBinaryArray::try_from_iter(
2887            vec![
2888                (0u8..8u8).collect::<Vec<u8>>(),
2889                (24u8..32u8).collect::<Vec<u8>>(),
2890            ]
2891            .into_iter(),
2892        )
2893        .unwrap();
2894        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2895        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2896
2897        let mut buffer = Vec::with_capacity(1024);
2898        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2899        writer.write(&batch).unwrap();
2900        writer.close().unwrap();
2901        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2902            .unwrap()
2903            .collect::<Result<Vec<_>, _>>()
2904            .unwrap();
2905
2906        assert_eq!(read.len(), 1);
2907        assert_eq!(&batch, &read[0])
2908    }
2909
2910    #[test]
2911    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2912        // the `StructArrayReader` will check the definition and repetition levels of the first
2913        // child column in the struct to determine nullability for the struct. If the first
2914        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2915        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2916        // buffers managed by this array reader.
2917
2918        let struct_fields = Fields::from(vec![
2919            Field::new(
2920                "city",
2921                ArrowDataType::Dictionary(
2922                    Box::new(ArrowDataType::UInt8),
2923                    Box::new(ArrowDataType::Utf8),
2924                ),
2925                true,
2926            ),
2927            Field::new("name", ArrowDataType::Utf8, true),
2928        ]);
2929        let schema = Arc::new(Schema::new(vec![Field::new(
2930            "items",
2931            ArrowDataType::Struct(struct_fields.clone()),
2932            true,
2933        )]));
2934
2935        let items_arr = StructArray::new(
2936            struct_fields,
2937            vec![
2938                Arc::new(DictionaryArray::new(
2939                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2940                    Arc::new(StringArray::from_iter_values(vec![
2941                        "quebec",
2942                        "fredericton",
2943                        "halifax",
2944                    ])),
2945                )),
2946                Arc::new(StringArray::from_iter_values(vec![
2947                    "albert", "terry", "lance", "", "tim",
2948                ])),
2949            ],
2950            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2951        );
2952
2953        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2954        let mut buffer = Vec::with_capacity(1024);
2955        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2956        writer.write(&batch).unwrap();
2957        writer.close().unwrap();
2958        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2959            .unwrap()
2960            .collect::<Result<Vec<_>, _>>()
2961            .unwrap();
2962
2963        assert_eq!(read.len(), 1);
2964        assert_eq!(&batch, &read[0])
2965    }
2966
2967    /// Parameters for single_column_reader_test
2968    #[derive(Clone)]
2969    struct TestOptions {
2970        /// Number of row group to write to parquet (row group size =
2971        /// num_row_groups / num_rows)
2972        num_row_groups: usize,
2973        /// Total number of rows per row group
2974        num_rows: usize,
2975        /// Size of batches to read back
2976        record_batch_size: usize,
2977        /// Percentage of nulls in column or None if required
2978        null_percent: Option<usize>,
2979        /// Set write batch size
2980        ///
2981        /// This is the number of rows that are written at once to a page and
2982        /// therefore acts as a bound on the page granularity of a row group
2983        write_batch_size: usize,
2984        /// Maximum size of page in bytes
2985        max_data_page_size: usize,
2986        /// Maximum size of dictionary page in bytes
2987        max_dict_page_size: usize,
2988        /// Writer version
2989        writer_version: WriterVersion,
2990        /// Enabled statistics
2991        enabled_statistics: EnabledStatistics,
2992        /// Encoding
2993        encoding: Encoding,
2994        /// row selections and total selected row count
2995        row_selections: Option<(RowSelection, usize)>,
2996        /// row filter
2997        row_filter: Option<Vec<bool>>,
2998        /// limit
2999        limit: Option<usize>,
3000        /// offset
3001        offset: Option<usize>,
3002    }
3003
3004    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3005    impl std::fmt::Debug for TestOptions {
3006        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3007            f.debug_struct("TestOptions")
3008                .field("num_row_groups", &self.num_row_groups)
3009                .field("num_rows", &self.num_rows)
3010                .field("record_batch_size", &self.record_batch_size)
3011                .field("null_percent", &self.null_percent)
3012                .field("write_batch_size", &self.write_batch_size)
3013                .field("max_data_page_size", &self.max_data_page_size)
3014                .field("max_dict_page_size", &self.max_dict_page_size)
3015                .field("writer_version", &self.writer_version)
3016                .field("enabled_statistics", &self.enabled_statistics)
3017                .field("encoding", &self.encoding)
3018                .field("row_selections", &self.row_selections.is_some())
3019                .field("row_filter", &self.row_filter.is_some())
3020                .field("limit", &self.limit)
3021                .field("offset", &self.offset)
3022                .finish()
3023        }
3024    }
3025
3026    impl Default for TestOptions {
3027        fn default() -> Self {
3028            Self {
3029                num_row_groups: 2,
3030                num_rows: 100,
3031                record_batch_size: 15,
3032                null_percent: None,
3033                write_batch_size: 64,
3034                max_data_page_size: 1024 * 1024,
3035                max_dict_page_size: 1024 * 1024,
3036                writer_version: WriterVersion::PARQUET_1_0,
3037                enabled_statistics: EnabledStatistics::Page,
3038                encoding: Encoding::PLAIN,
3039                row_selections: None,
3040                row_filter: None,
3041                limit: None,
3042                offset: None,
3043            }
3044        }
3045    }
3046
3047    impl TestOptions {
3048        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3049            Self {
3050                num_row_groups,
3051                num_rows,
3052                record_batch_size,
3053                ..Default::default()
3054            }
3055        }
3056
3057        fn with_null_percent(self, null_percent: usize) -> Self {
3058            Self {
3059                null_percent: Some(null_percent),
3060                ..self
3061            }
3062        }
3063
3064        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3065            Self {
3066                max_data_page_size,
3067                ..self
3068            }
3069        }
3070
3071        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3072            Self {
3073                max_dict_page_size,
3074                ..self
3075            }
3076        }
3077
3078        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3079            Self {
3080                enabled_statistics,
3081                ..self
3082            }
3083        }
3084
3085        fn with_row_selections(self) -> Self {
3086            assert!(self.row_filter.is_none(), "Must set row selection first");
3087
3088            let mut rng = rng();
3089            let step = rng.random_range(self.record_batch_size..self.num_rows);
3090            let row_selections = create_test_selection(
3091                step,
3092                self.num_row_groups * self.num_rows,
3093                rng.random::<bool>(),
3094            );
3095            Self {
3096                row_selections: Some(row_selections),
3097                ..self
3098            }
3099        }
3100
3101        fn with_row_filter(self) -> Self {
3102            let row_count = match &self.row_selections {
3103                Some((_, count)) => *count,
3104                None => self.num_row_groups * self.num_rows,
3105            };
3106
3107            let mut rng = rng();
3108            Self {
3109                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3110                ..self
3111            }
3112        }
3113
3114        fn with_limit(self, limit: usize) -> Self {
3115            Self {
3116                limit: Some(limit),
3117                ..self
3118            }
3119        }
3120
3121        fn with_offset(self, offset: usize) -> Self {
3122            Self {
3123                offset: Some(offset),
3124                ..self
3125            }
3126        }
3127
3128        fn writer_props(&self) -> WriterProperties {
3129            let builder = WriterProperties::builder()
3130                .set_data_page_size_limit(self.max_data_page_size)
3131                .set_write_batch_size(self.write_batch_size)
3132                .set_writer_version(self.writer_version)
3133                .set_statistics_enabled(self.enabled_statistics);
3134
3135            let builder = match self.encoding {
3136                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3137                    .set_dictionary_enabled(true)
3138                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3139                _ => builder
3140                    .set_dictionary_enabled(false)
3141                    .set_encoding(self.encoding),
3142            };
3143
3144            builder.build()
3145        }
3146    }
3147
3148    /// Create a parquet file and then read it using
3149    /// `ParquetFileArrowReader` using a standard set of parameters
3150    /// `opts`.
3151    ///
3152    /// `rand_max` represents the maximum size of value to pass to to
3153    /// value generator
3154    fn run_single_column_reader_tests<T, F, G>(
3155        rand_max: i32,
3156        converted_type: ConvertedType,
3157        arrow_type: Option<ArrowDataType>,
3158        converter: F,
3159        encodings: &[Encoding],
3160    ) where
3161        T: DataType,
3162        G: RandGen<T>,
3163        F: Fn(&[Option<T::T>]) -> ArrayRef,
3164    {
3165        let all_options = vec![
3166            // choose record_batch_batch (15) so batches cross row
3167            // group boundaries (50 rows in 2 row groups) cases.
3168            TestOptions::new(2, 100, 15),
3169            // choose record_batch_batch (5) so batches sometime fall
3170            // on row group boundaries and (25 rows in 3 row groups
3171            // --> row groups of 10, 10, and 5). Tests buffer
3172            // refilling edge cases.
3173            TestOptions::new(3, 25, 5),
3174            // Choose record_batch_size (25) so all batches fall
3175            // exactly on row group boundary (25). Tests buffer
3176            // refilling edge cases.
3177            TestOptions::new(4, 100, 25),
3178            // Set maximum page size so row groups have multiple pages
3179            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3180            // Set small dictionary page size to test dictionary fallback
3181            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3182            // Test optional but with no nulls
3183            TestOptions::new(2, 256, 127).with_null_percent(0),
3184            // Test optional with nulls
3185            TestOptions::new(2, 256, 93).with_null_percent(25),
3186            // Test with limit of 0
3187            TestOptions::new(4, 100, 25).with_limit(0),
3188            // Test with limit of 50
3189            TestOptions::new(4, 100, 25).with_limit(50),
3190            // Test with limit equal to number of rows
3191            TestOptions::new(4, 100, 25).with_limit(10),
3192            // Test with limit larger than number of rows
3193            TestOptions::new(4, 100, 25).with_limit(101),
3194            // Test with limit + offset equal to number of rows
3195            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3196            // Test with limit + offset equal to number of rows
3197            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3198            // Test with limit + offset larger than number of rows
3199            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3200            // Test with no page-level statistics
3201            TestOptions::new(2, 256, 91)
3202                .with_null_percent(25)
3203                .with_enabled_statistics(EnabledStatistics::Chunk),
3204            // Test with no statistics
3205            TestOptions::new(2, 256, 91)
3206                .with_null_percent(25)
3207                .with_enabled_statistics(EnabledStatistics::None),
3208            // Test with all null
3209            TestOptions::new(2, 128, 91)
3210                .with_null_percent(100)
3211                .with_enabled_statistics(EnabledStatistics::None),
3212            // Test skip
3213
3214            // choose record_batch_batch (15) so batches cross row
3215            // group boundaries (50 rows in 2 row groups) cases.
3216            TestOptions::new(2, 100, 15).with_row_selections(),
3217            // choose record_batch_batch (5) so batches sometime fall
3218            // on row group boundaries and (25 rows in 3 row groups
3219            // --> row groups of 10, 10, and 5). Tests buffer
3220            // refilling edge cases.
3221            TestOptions::new(3, 25, 5).with_row_selections(),
3222            // Choose record_batch_size (25) so all batches fall
3223            // exactly on row group boundary (25). Tests buffer
3224            // refilling edge cases.
3225            TestOptions::new(4, 100, 25).with_row_selections(),
3226            // Set maximum page size so row groups have multiple pages
3227            TestOptions::new(3, 256, 73)
3228                .with_max_data_page_size(128)
3229                .with_row_selections(),
3230            // Set small dictionary page size to test dictionary fallback
3231            TestOptions::new(3, 256, 57)
3232                .with_max_dict_page_size(128)
3233                .with_row_selections(),
3234            // Test optional but with no nulls
3235            TestOptions::new(2, 256, 127)
3236                .with_null_percent(0)
3237                .with_row_selections(),
3238            // Test optional with nulls
3239            TestOptions::new(2, 256, 93)
3240                .with_null_percent(25)
3241                .with_row_selections(),
3242            // Test optional with nulls
3243            TestOptions::new(2, 256, 93)
3244                .with_null_percent(25)
3245                .with_row_selections()
3246                .with_limit(10),
3247            // Test optional with nulls
3248            TestOptions::new(2, 256, 93)
3249                .with_null_percent(25)
3250                .with_row_selections()
3251                .with_offset(20)
3252                .with_limit(10),
3253            // Test filter
3254
3255            // Test with row filter
3256            TestOptions::new(4, 100, 25).with_row_filter(),
3257            // Test with row selection and row filter
3258            TestOptions::new(4, 100, 25)
3259                .with_row_selections()
3260                .with_row_filter(),
3261            // Test with nulls and row filter
3262            TestOptions::new(2, 256, 93)
3263                .with_null_percent(25)
3264                .with_max_data_page_size(10)
3265                .with_row_filter(),
3266            // Test with nulls and row filter and small pages
3267            TestOptions::new(2, 256, 93)
3268                .with_null_percent(25)
3269                .with_max_data_page_size(10)
3270                .with_row_selections()
3271                .with_row_filter(),
3272            // Test with row selection and no offset index and small pages
3273            TestOptions::new(2, 256, 93)
3274                .with_enabled_statistics(EnabledStatistics::None)
3275                .with_max_data_page_size(10)
3276                .with_row_selections(),
3277        ];
3278
3279        all_options.into_iter().for_each(|opts| {
3280            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3281                for encoding in encodings {
3282                    let opts = TestOptions {
3283                        writer_version,
3284                        encoding: *encoding,
3285                        ..opts.clone()
3286                    };
3287
3288                    single_column_reader_test::<T, _, G>(
3289                        opts,
3290                        rand_max,
3291                        converted_type,
3292                        arrow_type.clone(),
3293                        &converter,
3294                    )
3295                }
3296            }
3297        });
3298    }
3299
3300    /// Create a parquet file and then read it using
3301    /// `ParquetFileArrowReader` using the parameters described in
3302    /// `opts`.
3303    fn single_column_reader_test<T, F, G>(
3304        opts: TestOptions,
3305        rand_max: i32,
3306        converted_type: ConvertedType,
3307        arrow_type: Option<ArrowDataType>,
3308        converter: F,
3309    ) where
3310        T: DataType,
3311        G: RandGen<T>,
3312        F: Fn(&[Option<T::T>]) -> ArrayRef,
3313    {
3314        // Print out options to facilitate debugging failures on CI
3315        println!(
3316            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3317            T::get_physical_type(),
3318            converted_type,
3319            arrow_type,
3320            opts
3321        );
3322
3323        //according to null_percent generate def_levels
3324        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3325            Some(null_percent) => {
3326                let mut rng = rng();
3327
3328                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3329                    .map(|_| {
3330                        std::iter::from_fn(|| {
3331                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3332                        })
3333                        .take(opts.num_rows)
3334                        .collect()
3335                    })
3336                    .collect();
3337                (Repetition::OPTIONAL, Some(def_levels))
3338            }
3339            None => (Repetition::REQUIRED, None),
3340        };
3341
3342        //generate random table data
3343        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3344            .map(|idx| {
3345                let null_count = match def_levels.as_ref() {
3346                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3347                    None => 0,
3348                };
3349                G::gen_vec(rand_max, opts.num_rows - null_count)
3350            })
3351            .collect();
3352
3353        let len = match T::get_physical_type() {
3354            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3355            crate::basic::Type::INT96 => 12,
3356            _ => -1,
3357        };
3358
3359        let fields = vec![Arc::new(
3360            Type::primitive_type_builder("leaf", T::get_physical_type())
3361                .with_repetition(repetition)
3362                .with_converted_type(converted_type)
3363                .with_length(len)
3364                .build()
3365                .unwrap(),
3366        )];
3367
3368        let schema = Arc::new(
3369            Type::group_type_builder("test_schema")
3370                .with_fields(fields)
3371                .build()
3372                .unwrap(),
3373        );
3374
3375        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3376
3377        let mut file = tempfile::tempfile().unwrap();
3378
3379        generate_single_column_file_with_data::<T>(
3380            &values,
3381            def_levels.as_ref(),
3382            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3383            schema,
3384            arrow_field,
3385            &opts,
3386        )
3387        .unwrap();
3388
3389        file.rewind().unwrap();
3390
3391        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3392            opts.enabled_statistics == EnabledStatistics::Page,
3393        ));
3394
3395        let mut builder =
3396            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3397
3398        let expected_data = match opts.row_selections {
3399            Some((selections, row_count)) => {
3400                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3401
3402                let mut skip_data: Vec<Option<T::T>> = vec![];
3403                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3404                for select in dequeue {
3405                    if select.skip {
3406                        without_skip_data.drain(0..select.row_count);
3407                    } else {
3408                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3409                    }
3410                }
3411                builder = builder.with_row_selection(selections);
3412
3413                assert_eq!(skip_data.len(), row_count);
3414                skip_data
3415            }
3416            None => {
3417                //get flatten table data
3418                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3419                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3420                expected_data
3421            }
3422        };
3423
3424        let mut expected_data = match opts.row_filter {
3425            Some(filter) => {
3426                let expected_data = expected_data
3427                    .into_iter()
3428                    .zip(filter.iter())
3429                    .filter_map(|(d, f)| f.then(|| d))
3430                    .collect();
3431
3432                let mut filter_offset = 0;
3433                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3434                    ProjectionMask::all(),
3435                    move |b| {
3436                        let array = BooleanArray::from_iter(
3437                            filter
3438                                .iter()
3439                                .skip(filter_offset)
3440                                .take(b.num_rows())
3441                                .map(|x| Some(*x)),
3442                        );
3443                        filter_offset += b.num_rows();
3444                        Ok(array)
3445                    },
3446                ))]);
3447
3448                builder = builder.with_row_filter(filter);
3449                expected_data
3450            }
3451            None => expected_data,
3452        };
3453
3454        if let Some(offset) = opts.offset {
3455            builder = builder.with_offset(offset);
3456            expected_data = expected_data.into_iter().skip(offset).collect();
3457        }
3458
3459        if let Some(limit) = opts.limit {
3460            builder = builder.with_limit(limit);
3461            expected_data = expected_data.into_iter().take(limit).collect();
3462        }
3463
3464        let mut record_reader = builder
3465            .with_batch_size(opts.record_batch_size)
3466            .build()
3467            .unwrap();
3468
3469        let mut total_read = 0;
3470        loop {
3471            let maybe_batch = record_reader.next();
3472            if total_read < expected_data.len() {
3473                let end = min(total_read + opts.record_batch_size, expected_data.len());
3474                let batch = maybe_batch.unwrap().unwrap();
3475                assert_eq!(end - total_read, batch.num_rows());
3476
3477                let a = converter(&expected_data[total_read..end]);
3478                let b = batch.column(0);
3479
3480                assert_eq!(a.data_type(), b.data_type());
3481                assert_eq!(a.to_data(), b.to_data());
3482                assert_eq!(
3483                    a.as_any().type_id(),
3484                    b.as_any().type_id(),
3485                    "incorrect type ids"
3486                );
3487
3488                total_read = end;
3489            } else {
3490                assert!(maybe_batch.is_none());
3491                break;
3492            }
3493        }
3494    }
3495
3496    fn gen_expected_data<T: DataType>(
3497        def_levels: Option<&Vec<Vec<i16>>>,
3498        values: &[Vec<T::T>],
3499    ) -> Vec<Option<T::T>> {
3500        let data: Vec<Option<T::T>> = match def_levels {
3501            Some(levels) => {
3502                let mut values_iter = values.iter().flatten();
3503                levels
3504                    .iter()
3505                    .flatten()
3506                    .map(|d| match d {
3507                        1 => Some(values_iter.next().cloned().unwrap()),
3508                        0 => None,
3509                        _ => unreachable!(),
3510                    })
3511                    .collect()
3512            }
3513            None => values.iter().flatten().cloned().map(Some).collect(),
3514        };
3515        data
3516    }
3517
3518    fn generate_single_column_file_with_data<T: DataType>(
3519        values: &[Vec<T::T>],
3520        def_levels: Option<&Vec<Vec<i16>>>,
3521        file: File,
3522        schema: TypePtr,
3523        field: Option<Field>,
3524        opts: &TestOptions,
3525    ) -> Result<ParquetMetaData> {
3526        let mut writer_props = opts.writer_props();
3527        if let Some(field) = field {
3528            let arrow_schema = Schema::new(vec![field]);
3529            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3530        }
3531
3532        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3533
3534        for (idx, v) in values.iter().enumerate() {
3535            let def_levels = def_levels.map(|d| d[idx].as_slice());
3536            let mut row_group_writer = writer.next_row_group()?;
3537            {
3538                let mut column_writer = row_group_writer
3539                    .next_column()?
3540                    .expect("Column writer is none!");
3541
3542                column_writer
3543                    .typed::<T>()
3544                    .write_batch(v, def_levels, None)?;
3545
3546                column_writer.close()?;
3547            }
3548            row_group_writer.close()?;
3549        }
3550
3551        writer.close()
3552    }
3553
3554    fn get_test_file(file_name: &str) -> File {
3555        let mut path = PathBuf::new();
3556        path.push(arrow::util::test_util::arrow_test_data());
3557        path.push(file_name);
3558
3559        File::open(path.as_path()).expect("File not found!")
3560    }
3561
3562    #[test]
3563    fn test_read_structs() {
3564        // This particular test file has columns of struct types where there is
3565        // a column that has the same name as one of the struct fields
3566        // (see: ARROW-11452)
3567        let testdata = arrow::util::test_util::parquet_test_data();
3568        let path = format!("{testdata}/nested_structs.rust.parquet");
3569        let file = File::open(&path).unwrap();
3570        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3571
3572        for batch in record_batch_reader {
3573            batch.unwrap();
3574        }
3575
3576        let file = File::open(&path).unwrap();
3577        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3578
3579        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3580        let projected_reader = builder
3581            .with_projection(mask)
3582            .with_batch_size(60)
3583            .build()
3584            .unwrap();
3585
3586        let expected_schema = Schema::new(vec![
3587            Field::new(
3588                "roll_num",
3589                ArrowDataType::Struct(Fields::from(vec![Field::new(
3590                    "count",
3591                    ArrowDataType::UInt64,
3592                    false,
3593                )])),
3594                false,
3595            ),
3596            Field::new(
3597                "PC_CUR",
3598                ArrowDataType::Struct(Fields::from(vec![
3599                    Field::new("mean", ArrowDataType::Int64, false),
3600                    Field::new("sum", ArrowDataType::Int64, false),
3601                ])),
3602                false,
3603            ),
3604        ]);
3605
3606        // Tests for #1652 and #1654
3607        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3608
3609        for batch in projected_reader {
3610            let batch = batch.unwrap();
3611            assert_eq!(batch.schema().as_ref(), &expected_schema);
3612        }
3613    }
3614
3615    #[test]
3616    // same as test_read_structs but constructs projection mask via column names
3617    fn test_read_structs_by_name() {
3618        let testdata = arrow::util::test_util::parquet_test_data();
3619        let path = format!("{testdata}/nested_structs.rust.parquet");
3620        let file = File::open(&path).unwrap();
3621        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3622
3623        for batch in record_batch_reader {
3624            batch.unwrap();
3625        }
3626
3627        let file = File::open(&path).unwrap();
3628        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3629
3630        let mask = ProjectionMask::columns(
3631            builder.parquet_schema(),
3632            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3633        );
3634        let projected_reader = builder
3635            .with_projection(mask)
3636            .with_batch_size(60)
3637            .build()
3638            .unwrap();
3639
3640        let expected_schema = Schema::new(vec![
3641            Field::new(
3642                "roll_num",
3643                ArrowDataType::Struct(Fields::from(vec![Field::new(
3644                    "count",
3645                    ArrowDataType::UInt64,
3646                    false,
3647                )])),
3648                false,
3649            ),
3650            Field::new(
3651                "PC_CUR",
3652                ArrowDataType::Struct(Fields::from(vec![
3653                    Field::new("mean", ArrowDataType::Int64, false),
3654                    Field::new("sum", ArrowDataType::Int64, false),
3655                ])),
3656                false,
3657            ),
3658        ]);
3659
3660        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3661
3662        for batch in projected_reader {
3663            let batch = batch.unwrap();
3664            assert_eq!(batch.schema().as_ref(), &expected_schema);
3665        }
3666    }
3667
3668    #[test]
3669    fn test_read_maps() {
3670        let testdata = arrow::util::test_util::parquet_test_data();
3671        let path = format!("{testdata}/nested_maps.snappy.parquet");
3672        let file = File::open(path).unwrap();
3673        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3674
3675        for batch in record_batch_reader {
3676            batch.unwrap();
3677        }
3678    }
3679
3680    #[test]
3681    fn test_nested_nullability() {
3682        let message_type = "message nested {
3683          OPTIONAL Group group {
3684            REQUIRED INT32 leaf;
3685          }
3686        }";
3687
3688        let file = tempfile::tempfile().unwrap();
3689        let schema = Arc::new(parse_message_type(message_type).unwrap());
3690
3691        {
3692            // Write using low-level parquet API (#1167)
3693            let mut writer =
3694                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3695                    .unwrap();
3696
3697            {
3698                let mut row_group_writer = writer.next_row_group().unwrap();
3699                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3700
3701                column_writer
3702                    .typed::<Int32Type>()
3703                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3704                    .unwrap();
3705
3706                column_writer.close().unwrap();
3707                row_group_writer.close().unwrap();
3708            }
3709
3710            writer.close().unwrap();
3711        }
3712
3713        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3714        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3715
3716        let reader = builder.with_projection(mask).build().unwrap();
3717
3718        let expected_schema = Schema::new(vec![Field::new(
3719            "group",
3720            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3721            true,
3722        )]);
3723
3724        let batch = reader.into_iter().next().unwrap().unwrap();
3725        assert_eq!(batch.schema().as_ref(), &expected_schema);
3726        assert_eq!(batch.num_rows(), 4);
3727        assert_eq!(batch.column(0).null_count(), 2);
3728    }
3729
3730    #[test]
3731    fn test_dictionary_preservation() {
3732        let fields = vec![Arc::new(
3733            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3734                .with_repetition(Repetition::OPTIONAL)
3735                .with_converted_type(ConvertedType::UTF8)
3736                .build()
3737                .unwrap(),
3738        )];
3739
3740        let schema = Arc::new(
3741            Type::group_type_builder("test_schema")
3742                .with_fields(fields)
3743                .build()
3744                .unwrap(),
3745        );
3746
3747        let dict_type = ArrowDataType::Dictionary(
3748            Box::new(ArrowDataType::Int32),
3749            Box::new(ArrowDataType::Utf8),
3750        );
3751
3752        let arrow_field = Field::new("leaf", dict_type, true);
3753
3754        let mut file = tempfile::tempfile().unwrap();
3755
3756        let values = vec![
3757            vec![
3758                ByteArray::from("hello"),
3759                ByteArray::from("a"),
3760                ByteArray::from("b"),
3761                ByteArray::from("d"),
3762            ],
3763            vec![
3764                ByteArray::from("c"),
3765                ByteArray::from("a"),
3766                ByteArray::from("b"),
3767            ],
3768        ];
3769
3770        let def_levels = vec![
3771            vec![1, 0, 0, 1, 0, 0, 1, 1],
3772            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3773        ];
3774
3775        let opts = TestOptions {
3776            encoding: Encoding::RLE_DICTIONARY,
3777            ..Default::default()
3778        };
3779
3780        generate_single_column_file_with_data::<ByteArrayType>(
3781            &values,
3782            Some(&def_levels),
3783            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3784            schema,
3785            Some(arrow_field),
3786            &opts,
3787        )
3788        .unwrap();
3789
3790        file.rewind().unwrap();
3791
3792        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3793
3794        let batches = record_reader
3795            .collect::<Result<Vec<RecordBatch>, _>>()
3796            .unwrap();
3797
3798        assert_eq!(batches.len(), 6);
3799        assert!(batches.iter().all(|x| x.num_columns() == 1));
3800
3801        let row_counts = batches
3802            .iter()
3803            .map(|x| (x.num_rows(), x.column(0).null_count()))
3804            .collect::<Vec<_>>();
3805
3806        assert_eq!(
3807            row_counts,
3808            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3809        );
3810
3811        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3812
3813        // First and second batch in same row group -> same dictionary
3814        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3815        // Third batch spans row group -> computed dictionary
3816        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3817        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3818        // Fourth, fifth and sixth from same row group -> same dictionary
3819        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3820        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3821    }
3822
3823    #[test]
3824    fn test_read_null_list() {
3825        let testdata = arrow::util::test_util::parquet_test_data();
3826        let path = format!("{testdata}/null_list.parquet");
3827        let file = File::open(path).unwrap();
3828        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3829
3830        let batch = record_batch_reader.next().unwrap().unwrap();
3831        assert_eq!(batch.num_rows(), 1);
3832        assert_eq!(batch.num_columns(), 1);
3833        assert_eq!(batch.column(0).len(), 1);
3834
3835        let list = batch
3836            .column(0)
3837            .as_any()
3838            .downcast_ref::<ListArray>()
3839            .unwrap();
3840        assert_eq!(list.len(), 1);
3841        assert!(list.is_valid(0));
3842
3843        let val = list.value(0);
3844        assert_eq!(val.len(), 0);
3845    }
3846
3847    #[test]
3848    fn test_null_schema_inference() {
3849        let testdata = arrow::util::test_util::parquet_test_data();
3850        let path = format!("{testdata}/null_list.parquet");
3851        let file = File::open(path).unwrap();
3852
3853        let arrow_field = Field::new(
3854            "emptylist",
3855            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3856            true,
3857        );
3858
3859        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3860        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3861        let schema = builder.schema();
3862        assert_eq!(schema.fields().len(), 1);
3863        assert_eq!(schema.field(0), &arrow_field);
3864    }
3865
3866    #[test]
3867    fn test_skip_metadata() {
3868        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3869        let field = Field::new("col", col.data_type().clone(), true);
3870
3871        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3872
3873        let metadata = [("key".to_string(), "value".to_string())]
3874            .into_iter()
3875            .collect();
3876
3877        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3878
3879        assert_ne!(schema_with_metadata, schema_without_metadata);
3880
3881        let batch =
3882            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3883
3884        let file = |version: WriterVersion| {
3885            let props = WriterProperties::builder()
3886                .set_writer_version(version)
3887                .build();
3888
3889            let file = tempfile().unwrap();
3890            let mut writer =
3891                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3892                    .unwrap();
3893            writer.write(&batch).unwrap();
3894            writer.close().unwrap();
3895            file
3896        };
3897
3898        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3899
3900        let v1_reader = file(WriterVersion::PARQUET_1_0);
3901        let v2_reader = file(WriterVersion::PARQUET_2_0);
3902
3903        let arrow_reader =
3904            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3905        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3906
3907        let reader =
3908            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3909                .unwrap()
3910                .build()
3911                .unwrap();
3912        assert_eq!(reader.schema(), schema_without_metadata);
3913
3914        let arrow_reader =
3915            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3916        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3917
3918        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3919            .unwrap()
3920            .build()
3921            .unwrap();
3922        assert_eq!(reader.schema(), schema_without_metadata);
3923    }
3924
3925    fn write_parquet_from_iter<I, F>(value: I) -> File
3926    where
3927        I: IntoIterator<Item = (F, ArrayRef)>,
3928        F: AsRef<str>,
3929    {
3930        let batch = RecordBatch::try_from_iter(value).unwrap();
3931        let file = tempfile().unwrap();
3932        let mut writer =
3933            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3934        writer.write(&batch).unwrap();
3935        writer.close().unwrap();
3936        file
3937    }
3938
3939    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3940    where
3941        I: IntoIterator<Item = (F, ArrayRef)>,
3942        F: AsRef<str>,
3943    {
3944        let file = write_parquet_from_iter(value);
3945        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3946        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3947            file.try_clone().unwrap(),
3948            options_with_schema,
3949        );
3950        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3951    }
3952
3953    #[test]
3954    fn test_schema_too_few_columns() {
3955        run_schema_test_with_error(
3956            vec![
3957                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3958                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3959            ],
3960            Arc::new(Schema::new(vec![Field::new(
3961                "int64",
3962                ArrowDataType::Int64,
3963                false,
3964            )])),
3965            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3966        );
3967    }
3968
3969    #[test]
3970    fn test_schema_too_many_columns() {
3971        run_schema_test_with_error(
3972            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3973            Arc::new(Schema::new(vec![
3974                Field::new("int64", ArrowDataType::Int64, false),
3975                Field::new("int32", ArrowDataType::Int32, false),
3976            ])),
3977            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3978        );
3979    }
3980
3981    #[test]
3982    fn test_schema_mismatched_column_names() {
3983        run_schema_test_with_error(
3984            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3985            Arc::new(Schema::new(vec![Field::new(
3986                "other",
3987                ArrowDataType::Int64,
3988                false,
3989            )])),
3990            "Arrow: incompatible arrow schema, expected field named int64 got other",
3991        );
3992    }
3993
3994    #[test]
3995    fn test_schema_incompatible_columns() {
3996        run_schema_test_with_error(
3997            vec![
3998                (
3999                    "col1_invalid",
4000                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4001                ),
4002                (
4003                    "col2_valid",
4004                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4005                ),
4006                (
4007                    "col3_invalid",
4008                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4009                ),
4010            ],
4011            Arc::new(Schema::new(vec![
4012                Field::new("col1_invalid", ArrowDataType::Int32, false),
4013                Field::new("col2_valid", ArrowDataType::Int32, false),
4014                Field::new("col3_invalid", ArrowDataType::Int32, false),
4015            ])),
4016            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4017        );
4018    }
4019
4020    #[test]
4021    fn test_one_incompatible_nested_column() {
4022        let nested_fields = Fields::from(vec![
4023            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4024            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4025        ]);
4026        let nested = StructArray::try_new(
4027            nested_fields,
4028            vec![
4029                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4030                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4031            ],
4032            None,
4033        )
4034        .expect("struct array");
4035        let supplied_nested_fields = Fields::from(vec![
4036            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4037            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4038        ]);
4039        run_schema_test_with_error(
4040            vec![
4041                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4042                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4043                ("nested", Arc::new(nested) as ArrayRef),
4044            ],
4045            Arc::new(Schema::new(vec![
4046                Field::new("col1", ArrowDataType::Int64, false),
4047                Field::new("col2", ArrowDataType::Int32, false),
4048                Field::new(
4049                    "nested",
4050                    ArrowDataType::Struct(supplied_nested_fields),
4051                    false,
4052                ),
4053            ])),
4054            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4055            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4056            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4057        );
4058    }
4059
4060    /// Return parquet data with a single column of utf8 strings
4061    fn utf8_parquet() -> Bytes {
4062        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4063        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4064        let props = None;
4065        // write parquet file with non nullable strings
4066        let mut parquet_data = vec![];
4067        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4068        writer.write(&batch).unwrap();
4069        writer.close().unwrap();
4070        Bytes::from(parquet_data)
4071    }
4072
4073    #[test]
4074    fn test_schema_error_bad_types() {
4075        // verify incompatible schemas error on read
4076        let parquet_data = utf8_parquet();
4077
4078        // Ask to read it back with an incompatible schema (int vs string)
4079        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4080            "column1",
4081            arrow::datatypes::DataType::Int32,
4082            false,
4083        )]));
4084
4085        // read it back out
4086        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4087        let err =
4088            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4089                .unwrap_err();
4090        assert_eq!(
4091            err.to_string(),
4092            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4093        )
4094    }
4095
4096    #[test]
4097    fn test_schema_error_bad_nullability() {
4098        // verify incompatible schemas error on read
4099        let parquet_data = utf8_parquet();
4100
4101        // Ask to read it back with an incompatible schema (nullability mismatch)
4102        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4103            "column1",
4104            arrow::datatypes::DataType::Utf8,
4105            true,
4106        )]));
4107
4108        // read it back out
4109        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4110        let err =
4111            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4112                .unwrap_err();
4113        assert_eq!(
4114            err.to_string(),
4115            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4116        )
4117    }
4118
4119    #[test]
4120    fn test_read_binary_as_utf8() {
4121        let file = write_parquet_from_iter(vec![
4122            (
4123                "binary_to_utf8",
4124                Arc::new(BinaryArray::from(vec![
4125                    b"one".as_ref(),
4126                    b"two".as_ref(),
4127                    b"three".as_ref(),
4128                ])) as ArrayRef,
4129            ),
4130            (
4131                "large_binary_to_large_utf8",
4132                Arc::new(LargeBinaryArray::from(vec![
4133                    b"one".as_ref(),
4134                    b"two".as_ref(),
4135                    b"three".as_ref(),
4136                ])) as ArrayRef,
4137            ),
4138            (
4139                "binary_view_to_utf8_view",
4140                Arc::new(BinaryViewArray::from(vec![
4141                    b"one".as_ref(),
4142                    b"two".as_ref(),
4143                    b"three".as_ref(),
4144                ])) as ArrayRef,
4145            ),
4146        ]);
4147        let supplied_fields = Fields::from(vec![
4148            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4149            Field::new(
4150                "large_binary_to_large_utf8",
4151                ArrowDataType::LargeUtf8,
4152                false,
4153            ),
4154            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4155        ]);
4156
4157        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4158        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4159            file.try_clone().unwrap(),
4160            options,
4161        )
4162        .expect("reader builder with schema")
4163        .build()
4164        .expect("reader with schema");
4165
4166        let batch = arrow_reader.next().unwrap().unwrap();
4167        assert_eq!(batch.num_columns(), 3);
4168        assert_eq!(batch.num_rows(), 3);
4169        assert_eq!(
4170            batch
4171                .column(0)
4172                .as_string::<i32>()
4173                .iter()
4174                .collect::<Vec<_>>(),
4175            vec![Some("one"), Some("two"), Some("three")]
4176        );
4177
4178        assert_eq!(
4179            batch
4180                .column(1)
4181                .as_string::<i64>()
4182                .iter()
4183                .collect::<Vec<_>>(),
4184            vec![Some("one"), Some("two"), Some("three")]
4185        );
4186
4187        assert_eq!(
4188            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4189            vec![Some("one"), Some("two"), Some("three")]
4190        );
4191    }
4192
4193    #[test]
4194    #[should_panic(expected = "Invalid UTF8 sequence at")]
4195    fn test_read_non_utf8_binary_as_utf8() {
4196        let file = write_parquet_from_iter(vec![(
4197            "non_utf8_binary",
4198            Arc::new(BinaryArray::from(vec![
4199                b"\xDE\x00\xFF".as_ref(),
4200                b"\xDE\x01\xAA".as_ref(),
4201                b"\xDE\x02\xFF".as_ref(),
4202            ])) as ArrayRef,
4203        )]);
4204        let supplied_fields = Fields::from(vec![Field::new(
4205            "non_utf8_binary",
4206            ArrowDataType::Utf8,
4207            false,
4208        )]);
4209
4210        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4211        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4212            file.try_clone().unwrap(),
4213            options,
4214        )
4215        .expect("reader builder with schema")
4216        .build()
4217        .expect("reader with schema");
4218        arrow_reader.next().unwrap().unwrap_err();
4219    }
4220
4221    #[test]
4222    fn test_with_schema() {
4223        let nested_fields = Fields::from(vec![
4224            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4225            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4226        ]);
4227
4228        let nested_arrays: Vec<ArrayRef> = vec![
4229            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4230            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4231        ];
4232
4233        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4234
4235        let file = write_parquet_from_iter(vec![
4236            (
4237                "int32_to_ts_second",
4238                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4239            ),
4240            (
4241                "date32_to_date64",
4242                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4243            ),
4244            ("nested", Arc::new(nested) as ArrayRef),
4245        ]);
4246
4247        let supplied_nested_fields = Fields::from(vec![
4248            Field::new(
4249                "utf8_to_dict",
4250                ArrowDataType::Dictionary(
4251                    Box::new(ArrowDataType::Int32),
4252                    Box::new(ArrowDataType::Utf8),
4253                ),
4254                false,
4255            ),
4256            Field::new(
4257                "int64_to_ts_nano",
4258                ArrowDataType::Timestamp(
4259                    arrow::datatypes::TimeUnit::Nanosecond,
4260                    Some("+10:00".into()),
4261                ),
4262                false,
4263            ),
4264        ]);
4265
4266        let supplied_schema = Arc::new(Schema::new(vec![
4267            Field::new(
4268                "int32_to_ts_second",
4269                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4270                false,
4271            ),
4272            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4273            Field::new(
4274                "nested",
4275                ArrowDataType::Struct(supplied_nested_fields),
4276                false,
4277            ),
4278        ]));
4279
4280        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4281        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4282            file.try_clone().unwrap(),
4283            options,
4284        )
4285        .expect("reader builder with schema")
4286        .build()
4287        .expect("reader with schema");
4288
4289        assert_eq!(arrow_reader.schema(), supplied_schema);
4290        let batch = arrow_reader.next().unwrap().unwrap();
4291        assert_eq!(batch.num_columns(), 3);
4292        assert_eq!(batch.num_rows(), 4);
4293        assert_eq!(
4294            batch
4295                .column(0)
4296                .as_any()
4297                .downcast_ref::<TimestampSecondArray>()
4298                .expect("downcast to timestamp second")
4299                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4300                .map(|v| v.to_string())
4301                .expect("value as datetime"),
4302            "1970-01-01 01:00:00 +01:00"
4303        );
4304        assert_eq!(
4305            batch
4306                .column(1)
4307                .as_any()
4308                .downcast_ref::<Date64Array>()
4309                .expect("downcast to date64")
4310                .value_as_date(0)
4311                .map(|v| v.to_string())
4312                .expect("value as date"),
4313            "1970-01-01"
4314        );
4315
4316        let nested = batch
4317            .column(2)
4318            .as_any()
4319            .downcast_ref::<StructArray>()
4320            .expect("downcast to struct");
4321
4322        let nested_dict = nested
4323            .column(0)
4324            .as_any()
4325            .downcast_ref::<Int32DictionaryArray>()
4326            .expect("downcast to dictionary");
4327
4328        assert_eq!(
4329            nested_dict
4330                .values()
4331                .as_any()
4332                .downcast_ref::<StringArray>()
4333                .expect("downcast to string")
4334                .iter()
4335                .collect::<Vec<_>>(),
4336            vec![Some("a"), Some("b")]
4337        );
4338
4339        assert_eq!(
4340            nested_dict.keys().iter().collect::<Vec<_>>(),
4341            vec![Some(0), Some(0), Some(0), Some(1)]
4342        );
4343
4344        assert_eq!(
4345            nested
4346                .column(1)
4347                .as_any()
4348                .downcast_ref::<TimestampNanosecondArray>()
4349                .expect("downcast to timestamp nanosecond")
4350                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4351                .map(|v| v.to_string())
4352                .expect("value as datetime"),
4353            "1970-01-01 10:00:00.000000001 +10:00"
4354        );
4355    }
4356
4357    #[test]
4358    fn test_empty_projection() {
4359        let testdata = arrow::util::test_util::parquet_test_data();
4360        let path = format!("{testdata}/alltypes_plain.parquet");
4361        let file = File::open(path).unwrap();
4362
4363        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4364        let file_metadata = builder.metadata().file_metadata();
4365        let expected_rows = file_metadata.num_rows() as usize;
4366
4367        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4368        let batch_reader = builder
4369            .with_projection(mask)
4370            .with_batch_size(2)
4371            .build()
4372            .unwrap();
4373
4374        let mut total_rows = 0;
4375        for maybe_batch in batch_reader {
4376            let batch = maybe_batch.unwrap();
4377            total_rows += batch.num_rows();
4378            assert_eq!(batch.num_columns(), 0);
4379            assert!(batch.num_rows() <= 2);
4380        }
4381
4382        assert_eq!(total_rows, expected_rows);
4383    }
4384
4385    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4386        let schema = Arc::new(Schema::new(vec![Field::new(
4387            "list",
4388            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4389            true,
4390        )]));
4391
4392        let mut buf = Vec::with_capacity(1024);
4393
4394        let mut writer = ArrowWriter::try_new(
4395            &mut buf,
4396            schema.clone(),
4397            Some(
4398                WriterProperties::builder()
4399                    .set_max_row_group_row_count(Some(row_group_size))
4400                    .build(),
4401            ),
4402        )
4403        .unwrap();
4404        for _ in 0..2 {
4405            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4406            for _ in 0..(batch_size) {
4407                list_builder.append(true);
4408            }
4409            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4410                .unwrap();
4411            writer.write(&batch).unwrap();
4412        }
4413        writer.close().unwrap();
4414
4415        let mut record_reader =
4416            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4417        assert_eq!(
4418            batch_size,
4419            record_reader.next().unwrap().unwrap().num_rows()
4420        );
4421        assert_eq!(
4422            batch_size,
4423            record_reader.next().unwrap().unwrap().num_rows()
4424        );
4425    }
4426
4427    #[test]
4428    fn test_row_group_exact_multiple() {
4429        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4430        test_row_group_batch(8, 8);
4431        test_row_group_batch(10, 8);
4432        test_row_group_batch(8, 10);
4433        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4434        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4435        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4436        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4437        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4438    }
4439
4440    /// Given a RecordBatch containing all the column data, return the expected batches given
4441    /// a `batch_size` and `selection`
4442    fn get_expected_batches(
4443        column: &RecordBatch,
4444        selection: &RowSelection,
4445        batch_size: usize,
4446    ) -> Vec<RecordBatch> {
4447        let mut expected_batches = vec![];
4448
4449        let mut selection: VecDeque<_> = selection.clone().into();
4450        let mut row_offset = 0;
4451        let mut last_start = None;
4452        while row_offset < column.num_rows() && !selection.is_empty() {
4453            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4454            while batch_remaining > 0 && !selection.is_empty() {
4455                let (to_read, skip) = match selection.front_mut() {
4456                    Some(selection) if selection.row_count > batch_remaining => {
4457                        selection.row_count -= batch_remaining;
4458                        (batch_remaining, selection.skip)
4459                    }
4460                    Some(_) => {
4461                        let select = selection.pop_front().unwrap();
4462                        (select.row_count, select.skip)
4463                    }
4464                    None => break,
4465                };
4466
4467                batch_remaining -= to_read;
4468
4469                match skip {
4470                    true => {
4471                        if let Some(last_start) = last_start.take() {
4472                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4473                        }
4474                        row_offset += to_read
4475                    }
4476                    false => {
4477                        last_start.get_or_insert(row_offset);
4478                        row_offset += to_read
4479                    }
4480                }
4481            }
4482        }
4483
4484        if let Some(last_start) = last_start.take() {
4485            expected_batches.push(column.slice(last_start, row_offset - last_start))
4486        }
4487
4488        // Sanity check, all batches except the final should be the batch size
4489        for batch in &expected_batches[..expected_batches.len() - 1] {
4490            assert_eq!(batch.num_rows(), batch_size);
4491        }
4492
4493        expected_batches
4494    }
4495
4496    fn create_test_selection(
4497        step_len: usize,
4498        total_len: usize,
4499        skip_first: bool,
4500    ) -> (RowSelection, usize) {
4501        let mut remaining = total_len;
4502        let mut skip = skip_first;
4503        let mut vec = vec![];
4504        let mut selected_count = 0;
4505        while remaining != 0 {
4506            let step = if remaining > step_len {
4507                step_len
4508            } else {
4509                remaining
4510            };
4511            vec.push(RowSelector {
4512                row_count: step,
4513                skip,
4514            });
4515            remaining -= step;
4516            if !skip {
4517                selected_count += step;
4518            }
4519            skip = !skip;
4520        }
4521        (vec.into(), selected_count)
4522    }
4523
4524    #[test]
4525    fn test_scan_row_with_selection() {
4526        let testdata = arrow::util::test_util::parquet_test_data();
4527        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4528        let test_file = File::open(&path).unwrap();
4529
4530        let mut serial_reader =
4531            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4532        let data = serial_reader.next().unwrap().unwrap();
4533
4534        let do_test = |batch_size: usize, selection_len: usize| {
4535            for skip_first in [false, true] {
4536                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4537
4538                let expected = get_expected_batches(&data, &selections, batch_size);
4539                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4540                assert_eq!(
4541                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4542                    expected,
4543                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4544                );
4545            }
4546        };
4547
4548        // total row count 7300
4549        // 1. test selection len more than one page row count
4550        do_test(1000, 1000);
4551
4552        // 2. test selection len less than one page row count
4553        do_test(20, 20);
4554
4555        // 3. test selection_len less than batch_size
4556        do_test(20, 5);
4557
4558        // 4. test selection_len more than batch_size
4559        // If batch_size < selection_len
4560        do_test(20, 5);
4561
4562        fn create_skip_reader(
4563            test_file: &File,
4564            batch_size: usize,
4565            selections: RowSelection,
4566        ) -> ParquetRecordBatchReader {
4567            let options =
4568                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4569            let file = test_file.try_clone().unwrap();
4570            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4571                .unwrap()
4572                .with_batch_size(batch_size)
4573                .with_row_selection(selections)
4574                .build()
4575                .unwrap()
4576        }
4577    }
4578
4579    #[test]
4580    fn test_batch_size_overallocate() {
4581        let testdata = arrow::util::test_util::parquet_test_data();
4582        // `alltypes_plain.parquet` only have 8 rows
4583        let path = format!("{testdata}/alltypes_plain.parquet");
4584        let test_file = File::open(path).unwrap();
4585
4586        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4587        let num_rows = builder.metadata.file_metadata().num_rows();
4588        let reader = builder
4589            .with_batch_size(1024)
4590            .with_projection(ProjectionMask::all())
4591            .build()
4592            .unwrap();
4593        assert_ne!(1024, num_rows);
4594        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4595    }
4596
4597    #[test]
4598    fn test_read_with_page_index_enabled() {
4599        let testdata = arrow::util::test_util::parquet_test_data();
4600
4601        {
4602            // `alltypes_tiny_pages.parquet` has page index
4603            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4604            let test_file = File::open(path).unwrap();
4605            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4606                test_file,
4607                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4608            )
4609            .unwrap();
4610            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4611            let reader = builder.build().unwrap();
4612            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4613            assert_eq!(batches.len(), 8);
4614        }
4615
4616        {
4617            // `alltypes_plain.parquet` doesn't have page index
4618            let path = format!("{testdata}/alltypes_plain.parquet");
4619            let test_file = File::open(path).unwrap();
4620            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4621                test_file,
4622                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4623            )
4624            .unwrap();
4625            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4626            // we should read the file successfully.
4627            assert!(builder.metadata().offset_index().is_none());
4628            let reader = builder.build().unwrap();
4629            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4630            assert_eq!(batches.len(), 1);
4631        }
4632    }
4633
4634    #[test]
4635    fn test_raw_repetition() {
4636        const MESSAGE_TYPE: &str = "
4637            message Log {
4638              OPTIONAL INT32 eventType;
4639              REPEATED INT32 category;
4640              REPEATED group filter {
4641                OPTIONAL INT32 error;
4642              }
4643            }
4644        ";
4645        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4646        let props = Default::default();
4647
4648        let mut buf = Vec::with_capacity(1024);
4649        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4650        let mut row_group_writer = writer.next_row_group().unwrap();
4651
4652        // column 0
4653        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4654        col_writer
4655            .typed::<Int32Type>()
4656            .write_batch(&[1], Some(&[1]), None)
4657            .unwrap();
4658        col_writer.close().unwrap();
4659        // column 1
4660        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4661        col_writer
4662            .typed::<Int32Type>()
4663            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4664            .unwrap();
4665        col_writer.close().unwrap();
4666        // column 2
4667        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4668        col_writer
4669            .typed::<Int32Type>()
4670            .write_batch(&[1], Some(&[1]), Some(&[0]))
4671            .unwrap();
4672        col_writer.close().unwrap();
4673
4674        let rg_md = row_group_writer.close().unwrap();
4675        assert_eq!(rg_md.num_rows(), 1);
4676        writer.close().unwrap();
4677
4678        let bytes = Bytes::from(buf);
4679
4680        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4681        let full = no_mask.next().unwrap().unwrap();
4682
4683        assert_eq!(full.num_columns(), 3);
4684
4685        for idx in 0..3 {
4686            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4687            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4688            let mut reader = b.with_projection(mask).build().unwrap();
4689            let projected = reader.next().unwrap().unwrap();
4690
4691            assert_eq!(projected.num_columns(), 1);
4692            assert_eq!(full.column(idx), projected.column(0));
4693        }
4694    }
4695
4696    #[test]
4697    fn test_read_lz4_raw() {
4698        let testdata = arrow::util::test_util::parquet_test_data();
4699        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4700        let file = File::open(path).unwrap();
4701
4702        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4703            .unwrap()
4704            .collect::<Result<Vec<_>, _>>()
4705            .unwrap();
4706        assert_eq!(batches.len(), 1);
4707        let batch = &batches[0];
4708
4709        assert_eq!(batch.num_columns(), 3);
4710        assert_eq!(batch.num_rows(), 4);
4711
4712        // https://github.com/apache/parquet-testing/pull/18
4713        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4714        assert_eq!(
4715            a.values(),
4716            &[1593604800, 1593604800, 1593604801, 1593604801]
4717        );
4718
4719        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4720        let a: Vec<_> = a.iter().flatten().collect();
4721        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4722
4723        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4724        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4725    }
4726
4727    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4728    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4729    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4730    //    LZ4_HADOOP algorithm for compression.
4731    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4732    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4733    //    older parquet-cpp versions.
4734    //
4735    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4736    #[test]
4737    fn test_read_lz4_hadoop_fallback() {
4738        for file in [
4739            "hadoop_lz4_compressed.parquet",
4740            "non_hadoop_lz4_compressed.parquet",
4741        ] {
4742            let testdata = arrow::util::test_util::parquet_test_data();
4743            let path = format!("{testdata}/{file}");
4744            let file = File::open(path).unwrap();
4745            let expected_rows = 4;
4746
4747            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4748                .unwrap()
4749                .collect::<Result<Vec<_>, _>>()
4750                .unwrap();
4751            assert_eq!(batches.len(), 1);
4752            let batch = &batches[0];
4753
4754            assert_eq!(batch.num_columns(), 3);
4755            assert_eq!(batch.num_rows(), expected_rows);
4756
4757            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4758            assert_eq!(
4759                a.values(),
4760                &[1593604800, 1593604800, 1593604801, 1593604801]
4761            );
4762
4763            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4764            let b: Vec<_> = b.iter().flatten().collect();
4765            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4766
4767            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4768            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4769        }
4770    }
4771
4772    #[test]
4773    fn test_read_lz4_hadoop_large() {
4774        let testdata = arrow::util::test_util::parquet_test_data();
4775        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4776        let file = File::open(path).unwrap();
4777        let expected_rows = 10000;
4778
4779        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4780            .unwrap()
4781            .collect::<Result<Vec<_>, _>>()
4782            .unwrap();
4783        assert_eq!(batches.len(), 1);
4784        let batch = &batches[0];
4785
4786        assert_eq!(batch.num_columns(), 1);
4787        assert_eq!(batch.num_rows(), expected_rows);
4788
4789        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4790        let a: Vec<_> = a.iter().flatten().collect();
4791        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4792        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4793        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4794        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4795    }
4796
4797    #[test]
4798    #[cfg(feature = "snap")]
4799    fn test_read_nested_lists() {
4800        let testdata = arrow::util::test_util::parquet_test_data();
4801        let path = format!("{testdata}/nested_lists.snappy.parquet");
4802        let file = File::open(path).unwrap();
4803
4804        let f = file.try_clone().unwrap();
4805        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4806        let expected = reader.next().unwrap().unwrap();
4807        assert_eq!(expected.num_rows(), 3);
4808
4809        let selection = RowSelection::from(vec![
4810            RowSelector::skip(1),
4811            RowSelector::select(1),
4812            RowSelector::skip(1),
4813        ]);
4814        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4815            .unwrap()
4816            .with_row_selection(selection)
4817            .build()
4818            .unwrap();
4819
4820        let actual = reader.next().unwrap().unwrap();
4821        assert_eq!(actual.num_rows(), 1);
4822        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4823    }
4824
4825    #[test]
4826    fn test_arbitrary_decimal() {
4827        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4828        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4829            .with_precision_and_scale(19, 0)
4830            .unwrap();
4831        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4832            .with_precision_and_scale(12, 0)
4833            .unwrap();
4834        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4835            .with_precision_and_scale(17, 10)
4836            .unwrap();
4837
4838        let written = RecordBatch::try_from_iter([
4839            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4840            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4841            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4842        ])
4843        .unwrap();
4844
4845        let mut buffer = Vec::with_capacity(1024);
4846        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4847        writer.write(&written).unwrap();
4848        writer.close().unwrap();
4849
4850        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4851            .unwrap()
4852            .collect::<Result<Vec<_>, _>>()
4853            .unwrap();
4854
4855        assert_eq!(&written.slice(0, 8), &read[0]);
4856    }
4857
4858    #[test]
4859    fn test_list_skip() {
4860        let mut list = ListBuilder::new(Int32Builder::new());
4861        list.append_value([Some(1), Some(2)]);
4862        list.append_value([Some(3)]);
4863        list.append_value([Some(4)]);
4864        let list = list.finish();
4865        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4866
4867        // First page contains 2 values but only 1 row
4868        let props = WriterProperties::builder()
4869            .set_data_page_row_count_limit(1)
4870            .set_write_batch_size(2)
4871            .build();
4872
4873        let mut buffer = Vec::with_capacity(1024);
4874        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4875        writer.write(&batch).unwrap();
4876        writer.close().unwrap();
4877
4878        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4879        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4880            .unwrap()
4881            .with_row_selection(selection.into())
4882            .build()
4883            .unwrap();
4884        let out = reader.next().unwrap().unwrap();
4885        assert_eq!(out.num_rows(), 1);
4886        assert_eq!(out, batch.slice(2, 1));
4887    }
4888
4889    fn test_decimal32_roundtrip() {
4890        let d = |values: Vec<i32>, p: u8| {
4891            let iter = values.into_iter();
4892            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4893                .with_precision_and_scale(p, 2)
4894                .unwrap()
4895        };
4896
4897        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4898        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4899
4900        let mut buffer = Vec::with_capacity(1024);
4901        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4902        writer.write(&batch).unwrap();
4903        writer.close().unwrap();
4904
4905        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4906        let t1 = builder.parquet_schema().columns()[0].physical_type();
4907        assert_eq!(t1, PhysicalType::INT32);
4908
4909        let mut reader = builder.build().unwrap();
4910        assert_eq!(batch.schema(), reader.schema());
4911
4912        let out = reader.next().unwrap().unwrap();
4913        assert_eq!(batch, out);
4914    }
4915
4916    fn test_decimal64_roundtrip() {
4917        // Precision <= 9 -> INT32
4918        // Precision <= 18 -> INT64
4919
4920        let d = |values: Vec<i64>, p: u8| {
4921            let iter = values.into_iter();
4922            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4923                .with_precision_and_scale(p, 2)
4924                .unwrap()
4925        };
4926
4927        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4928        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4929        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4930
4931        let batch = RecordBatch::try_from_iter([
4932            ("d1", Arc::new(d1) as ArrayRef),
4933            ("d2", Arc::new(d2) as ArrayRef),
4934            ("d3", Arc::new(d3) as ArrayRef),
4935        ])
4936        .unwrap();
4937
4938        let mut buffer = Vec::with_capacity(1024);
4939        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4940        writer.write(&batch).unwrap();
4941        writer.close().unwrap();
4942
4943        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4944        let t1 = builder.parquet_schema().columns()[0].physical_type();
4945        assert_eq!(t1, PhysicalType::INT32);
4946        let t2 = builder.parquet_schema().columns()[1].physical_type();
4947        assert_eq!(t2, PhysicalType::INT64);
4948        let t3 = builder.parquet_schema().columns()[2].physical_type();
4949        assert_eq!(t3, PhysicalType::INT64);
4950
4951        let mut reader = builder.build().unwrap();
4952        assert_eq!(batch.schema(), reader.schema());
4953
4954        let out = reader.next().unwrap().unwrap();
4955        assert_eq!(batch, out);
4956    }
4957
4958    fn test_decimal_roundtrip<T: DecimalType>() {
4959        // Precision <= 9 -> INT32
4960        // Precision <= 18 -> INT64
4961        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4962
4963        let d = |values: Vec<usize>, p: u8| {
4964            let iter = values.into_iter().map(T::Native::usize_as);
4965            PrimitiveArray::<T>::from_iter_values(iter)
4966                .with_precision_and_scale(p, 2)
4967                .unwrap()
4968        };
4969
4970        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4971        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4972        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4973        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4974
4975        let batch = RecordBatch::try_from_iter([
4976            ("d1", Arc::new(d1) as ArrayRef),
4977            ("d2", Arc::new(d2) as ArrayRef),
4978            ("d3", Arc::new(d3) as ArrayRef),
4979            ("d4", Arc::new(d4) as ArrayRef),
4980        ])
4981        .unwrap();
4982
4983        let mut buffer = Vec::with_capacity(1024);
4984        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4985        writer.write(&batch).unwrap();
4986        writer.close().unwrap();
4987
4988        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4989        let t1 = builder.parquet_schema().columns()[0].physical_type();
4990        assert_eq!(t1, PhysicalType::INT32);
4991        let t2 = builder.parquet_schema().columns()[1].physical_type();
4992        assert_eq!(t2, PhysicalType::INT64);
4993        let t3 = builder.parquet_schema().columns()[2].physical_type();
4994        assert_eq!(t3, PhysicalType::INT64);
4995        let t4 = builder.parquet_schema().columns()[3].physical_type();
4996        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4997
4998        let mut reader = builder.build().unwrap();
4999        assert_eq!(batch.schema(), reader.schema());
5000
5001        let out = reader.next().unwrap().unwrap();
5002        assert_eq!(batch, out);
5003    }
5004
5005    #[test]
5006    fn test_decimal() {
5007        test_decimal32_roundtrip();
5008        test_decimal64_roundtrip();
5009        test_decimal_roundtrip::<Decimal128Type>();
5010        test_decimal_roundtrip::<Decimal256Type>();
5011    }
5012
5013    #[test]
5014    fn test_list_selection() {
5015        let schema = Arc::new(Schema::new(vec![Field::new_list(
5016            "list",
5017            Field::new_list_field(ArrowDataType::Utf8, true),
5018            false,
5019        )]));
5020        let mut buf = Vec::with_capacity(1024);
5021
5022        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5023
5024        for i in 0..2 {
5025            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5026            for j in 0..1024 {
5027                list_a_builder.values().append_value(format!("{i} {j}"));
5028                list_a_builder.append(true);
5029            }
5030            let batch =
5031                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5032                    .unwrap();
5033            writer.write(&batch).unwrap();
5034        }
5035        let _metadata = writer.close().unwrap();
5036
5037        let buf = Bytes::from(buf);
5038        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5039            .unwrap()
5040            .with_row_selection(RowSelection::from(vec![
5041                RowSelector::skip(100),
5042                RowSelector::select(924),
5043                RowSelector::skip(100),
5044                RowSelector::select(924),
5045            ]))
5046            .build()
5047            .unwrap();
5048
5049        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5050        let batch = concat_batches(&schema, &batches).unwrap();
5051
5052        assert_eq!(batch.num_rows(), 924 * 2);
5053        let list = batch.column(0).as_list::<i32>();
5054
5055        for w in list.value_offsets().windows(2) {
5056            assert_eq!(w[0] + 1, w[1])
5057        }
5058        let mut values = list.values().as_string::<i32>().iter();
5059
5060        for i in 0..2 {
5061            for j in 100..1024 {
5062                let expected = format!("{i} {j}");
5063                assert_eq!(values.next().unwrap().unwrap(), &expected);
5064            }
5065        }
5066    }
5067
5068    #[test]
5069    fn test_list_selection_fuzz() {
5070        let mut rng = rng();
5071        let schema = Arc::new(Schema::new(vec![Field::new_list(
5072            "list",
5073            Field::new_list(
5074                Field::LIST_FIELD_DEFAULT_NAME,
5075                Field::new_list_field(ArrowDataType::Int32, true),
5076                true,
5077            ),
5078            true,
5079        )]));
5080        let mut buf = Vec::with_capacity(1024);
5081        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5082
5083        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5084
5085        for _ in 0..2048 {
5086            if rng.random_bool(0.2) {
5087                list_a_builder.append(false);
5088                continue;
5089            }
5090
5091            let list_a_len = rng.random_range(0..10);
5092            let list_b_builder = list_a_builder.values();
5093
5094            for _ in 0..list_a_len {
5095                if rng.random_bool(0.2) {
5096                    list_b_builder.append(false);
5097                    continue;
5098                }
5099
5100                let list_b_len = rng.random_range(0..10);
5101                let int_builder = list_b_builder.values();
5102                for _ in 0..list_b_len {
5103                    match rng.random_bool(0.2) {
5104                        true => int_builder.append_null(),
5105                        false => int_builder.append_value(rng.random()),
5106                    }
5107                }
5108                list_b_builder.append(true)
5109            }
5110            list_a_builder.append(true);
5111        }
5112
5113        let array = Arc::new(list_a_builder.finish());
5114        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5115
5116        writer.write(&batch).unwrap();
5117        let _metadata = writer.close().unwrap();
5118
5119        let buf = Bytes::from(buf);
5120
5121        let cases = [
5122            vec![
5123                RowSelector::skip(100),
5124                RowSelector::select(924),
5125                RowSelector::skip(100),
5126                RowSelector::select(924),
5127            ],
5128            vec![
5129                RowSelector::select(924),
5130                RowSelector::skip(100),
5131                RowSelector::select(924),
5132                RowSelector::skip(100),
5133            ],
5134            vec![
5135                RowSelector::skip(1023),
5136                RowSelector::select(1),
5137                RowSelector::skip(1023),
5138                RowSelector::select(1),
5139            ],
5140            vec![
5141                RowSelector::select(1),
5142                RowSelector::skip(1023),
5143                RowSelector::select(1),
5144                RowSelector::skip(1023),
5145            ],
5146        ];
5147
5148        for batch_size in [100, 1024, 2048] {
5149            for selection in &cases {
5150                let selection = RowSelection::from(selection.clone());
5151                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5152                    .unwrap()
5153                    .with_row_selection(selection.clone())
5154                    .with_batch_size(batch_size)
5155                    .build()
5156                    .unwrap();
5157
5158                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5159                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5160                assert_eq!(actual.num_rows(), selection.row_count());
5161
5162                let mut batch_offset = 0;
5163                let mut actual_offset = 0;
5164                for selector in selection.iter() {
5165                    if selector.skip {
5166                        batch_offset += selector.row_count;
5167                        continue;
5168                    }
5169
5170                    assert_eq!(
5171                        batch.slice(batch_offset, selector.row_count),
5172                        actual.slice(actual_offset, selector.row_count)
5173                    );
5174
5175                    batch_offset += selector.row_count;
5176                    actual_offset += selector.row_count;
5177                }
5178            }
5179        }
5180    }
5181
5182    #[test]
5183    fn test_read_old_nested_list() {
5184        use arrow::datatypes::DataType;
5185        use arrow::datatypes::ToByteSlice;
5186
5187        let testdata = arrow::util::test_util::parquet_test_data();
5188        // message my_record {
5189        //     REQUIRED group a (LIST) {
5190        //         REPEATED group array (LIST) {
5191        //             REPEATED INT32 array;
5192        //         }
5193        //     }
5194        // }
5195        // should be read as list<list<int32>>
5196        let path = format!("{testdata}/old_list_structure.parquet");
5197        let test_file = File::open(path).unwrap();
5198
5199        // create expected ListArray
5200        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5201
5202        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5203        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5204
5205        // Construct a list array from the above two
5206        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5207            "array",
5208            DataType::Int32,
5209            false,
5210        ))))
5211        .len(2)
5212        .add_buffer(a_value_offsets)
5213        .add_child_data(a_values.into_data())
5214        .build()
5215        .unwrap();
5216        let a = ListArray::from(a_list_data);
5217
5218        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5219        let mut reader = builder.build().unwrap();
5220        let out = reader.next().unwrap().unwrap();
5221        assert_eq!(out.num_rows(), 1);
5222        assert_eq!(out.num_columns(), 1);
5223        // grab first column
5224        let c0 = out.column(0);
5225        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5226        // get first row: [[1, 2], [3, 4]]
5227        let r0 = c0arr.value(0);
5228        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5229        assert_eq!(r0arr, &a);
5230    }
5231
5232    #[test]
5233    fn test_map_no_value() {
5234        // File schema:
5235        // message schema {
5236        //   required group my_map (MAP) {
5237        //     repeated group key_value {
5238        //       required int32 key;
5239        //       optional int32 value;
5240        //     }
5241        //   }
5242        //   required group my_map_no_v (MAP) {
5243        //     repeated group key_value {
5244        //       required int32 key;
5245        //     }
5246        //   }
5247        //   required group my_list (LIST) {
5248        //     repeated group list {
5249        //       required int32 element;
5250        //     }
5251        //   }
5252        // }
5253        let testdata = arrow::util::test_util::parquet_test_data();
5254        let path = format!("{testdata}/map_no_value.parquet");
5255        let file = File::open(path).unwrap();
5256
5257        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5258            .unwrap()
5259            .build()
5260            .unwrap();
5261        let out = reader.next().unwrap().unwrap();
5262        assert_eq!(out.num_rows(), 3);
5263        assert_eq!(out.num_columns(), 3);
5264        // my_map_no_v and my_list columns should now be equivalent
5265        let c0 = out.column(1).as_list::<i32>();
5266        let c1 = out.column(2).as_list::<i32>();
5267        assert_eq!(c0.len(), c1.len());
5268        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5269    }
5270
5271    #[test]
5272    fn test_read_unknown_logical_type() {
5273        let testdata = arrow::util::test_util::parquet_test_data();
5274        let path = format!("{testdata}/unknown-logical-type.parquet");
5275        let test_file = File::open(path).unwrap();
5276
5277        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5278            .expect("Error creating reader builder");
5279
5280        let schema = builder.metadata().file_metadata().schema_descr();
5281        assert_eq!(
5282            schema.column(0).logical_type_ref(),
5283            Some(&LogicalType::String)
5284        );
5285        assert_eq!(
5286            schema.column(1).logical_type_ref(),
5287            Some(&LogicalType::_Unknown { field_id: 2555 })
5288        );
5289        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5290
5291        let mut reader = builder.build().unwrap();
5292        let out = reader.next().unwrap().unwrap();
5293        assert_eq!(out.num_rows(), 3);
5294        assert_eq!(out.num_columns(), 2);
5295    }
5296
5297    #[test]
5298    fn test_read_row_numbers() {
5299        let file = write_parquet_from_iter(vec![(
5300            "value",
5301            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5302        )]);
5303        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5304
5305        let row_number_field = Arc::new(
5306            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5307        );
5308
5309        let options = ArrowReaderOptions::new()
5310            .with_schema(Arc::new(Schema::new(supplied_fields)))
5311            .with_virtual_columns(vec![row_number_field.clone()])
5312            .unwrap();
5313        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5314            file.try_clone().unwrap(),
5315            options,
5316        )
5317        .expect("reader builder with schema")
5318        .build()
5319        .expect("reader with schema");
5320
5321        let batch = arrow_reader.next().unwrap().unwrap();
5322        let schema = Arc::new(Schema::new(vec![
5323            Field::new("value", ArrowDataType::Int64, false),
5324            (*row_number_field).clone(),
5325        ]));
5326
5327        assert_eq!(batch.schema(), schema);
5328        assert_eq!(batch.num_columns(), 2);
5329        assert_eq!(batch.num_rows(), 3);
5330        assert_eq!(
5331            batch
5332                .column(0)
5333                .as_primitive::<types::Int64Type>()
5334                .iter()
5335                .collect::<Vec<_>>(),
5336            vec![Some(1), Some(2), Some(3)]
5337        );
5338        assert_eq!(
5339            batch
5340                .column(1)
5341                .as_primitive::<types::Int64Type>()
5342                .iter()
5343                .collect::<Vec<_>>(),
5344            vec![Some(0), Some(1), Some(2)]
5345        );
5346    }
5347
5348    #[test]
5349    fn test_read_only_row_numbers() {
5350        let file = write_parquet_from_iter(vec![(
5351            "value",
5352            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5353        )]);
5354        let row_number_field = Arc::new(
5355            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5356        );
5357        let options = ArrowReaderOptions::new()
5358            .with_virtual_columns(vec![row_number_field.clone()])
5359            .unwrap();
5360        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5361        let num_columns = metadata
5362            .metadata
5363            .file_metadata()
5364            .schema_descr()
5365            .num_columns();
5366
5367        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5368            .with_projection(ProjectionMask::none(num_columns))
5369            .build()
5370            .expect("reader with schema");
5371
5372        let batch = arrow_reader.next().unwrap().unwrap();
5373        let schema = Arc::new(Schema::new(vec![row_number_field]));
5374
5375        assert_eq!(batch.schema(), schema);
5376        assert_eq!(batch.num_columns(), 1);
5377        assert_eq!(batch.num_rows(), 3);
5378        assert_eq!(
5379            batch
5380                .column(0)
5381                .as_primitive::<types::Int64Type>()
5382                .iter()
5383                .collect::<Vec<_>>(),
5384            vec![Some(0), Some(1), Some(2)]
5385        );
5386    }
5387
5388    #[test]
5389    fn test_read_row_numbers_row_group_order() -> Result<()> {
5390        // Make a parquet file with 100 rows split across 2 row groups
5391        let array = Int64Array::from_iter_values(5000..5100);
5392        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5393        let mut buffer = Vec::new();
5394        let options = WriterProperties::builder()
5395            .set_max_row_group_row_count(Some(50))
5396            .build();
5397        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5398        // write in 10 row batches as the size limits are enforced after each batch
5399        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5400            writer.write(&batch_chunk)?;
5401        }
5402        writer.close()?;
5403
5404        let row_number_field = Arc::new(
5405            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5406        );
5407
5408        let buffer = Bytes::from(buffer);
5409
5410        let options =
5411            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5412
5413        // read out with normal options
5414        let arrow_reader =
5415            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5416                .build()?;
5417
5418        assert_eq!(
5419            ValuesAndRowNumbers {
5420                values: (5000..5100).collect(),
5421                row_numbers: (0..100).collect()
5422            },
5423            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5424        );
5425
5426        // Now read, out of order row groups
5427        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5428            .with_row_groups(vec![1, 0])
5429            .build()?;
5430
5431        assert_eq!(
5432            ValuesAndRowNumbers {
5433                values: (5050..5100).chain(5000..5050).collect(),
5434                row_numbers: (50..100).chain(0..50).collect(),
5435            },
5436            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5437        );
5438
5439        Ok(())
5440    }
5441
5442    #[derive(Debug, PartialEq)]
5443    struct ValuesAndRowNumbers {
5444        values: Vec<i64>,
5445        row_numbers: Vec<i64>,
5446    }
5447    impl ValuesAndRowNumbers {
5448        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5449            let mut values = vec![];
5450            let mut row_numbers = vec![];
5451            for batch in reader {
5452                let batch = batch.expect("Could not read batch");
5453                values.extend(
5454                    batch
5455                        .column_by_name("col")
5456                        .expect("Could not get col column")
5457                        .as_primitive::<arrow::datatypes::Int64Type>()
5458                        .iter()
5459                        .map(|v| v.expect("Could not get value")),
5460                );
5461
5462                row_numbers.extend(
5463                    batch
5464                        .column_by_name("row_number")
5465                        .expect("Could not get row_number column")
5466                        .as_primitive::<arrow::datatypes::Int64Type>()
5467                        .iter()
5468                        .map(|v| v.expect("Could not get row number"))
5469                        .collect::<Vec<_>>(),
5470                );
5471            }
5472            Self {
5473                values,
5474                row_numbers,
5475            }
5476        }
5477    }
5478
5479    #[test]
5480    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5481        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5482        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5483        assert_eq!(
5484            ArrowReaderOptions::new()
5485                .with_virtual_columns(vec![regular_field])
5486                .unwrap_err()
5487                .to_string(),
5488            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5489        );
5490    }
5491
5492    #[test]
5493    fn test_row_numbers_with_multiple_row_groups() {
5494        test_row_numbers_with_multiple_row_groups_helper(
5495            false,
5496            |path, selection, _row_filter, batch_size| {
5497                let file = File::open(path).unwrap();
5498                let row_number_field = Arc::new(
5499                    Field::new("row_number", ArrowDataType::Int64, false)
5500                        .with_extension_type(RowNumber),
5501                );
5502                let options = ArrowReaderOptions::new()
5503                    .with_virtual_columns(vec![row_number_field])
5504                    .unwrap();
5505                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5506                    .unwrap()
5507                    .with_row_selection(selection)
5508                    .with_batch_size(batch_size)
5509                    .build()
5510                    .expect("Could not create reader");
5511                reader
5512                    .collect::<Result<Vec<_>, _>>()
5513                    .expect("Could not read")
5514            },
5515        );
5516    }
5517
5518    #[test]
5519    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5520        test_row_numbers_with_multiple_row_groups_helper(
5521            true,
5522            |path, selection, row_filter, batch_size| {
5523                let file = File::open(path).unwrap();
5524                let row_number_field = Arc::new(
5525                    Field::new("row_number", ArrowDataType::Int64, false)
5526                        .with_extension_type(RowNumber),
5527                );
5528                let options = ArrowReaderOptions::new()
5529                    .with_virtual_columns(vec![row_number_field])
5530                    .unwrap();
5531                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5532                    .unwrap()
5533                    .with_row_selection(selection)
5534                    .with_batch_size(batch_size)
5535                    .with_row_filter(row_filter.expect("No filter"))
5536                    .build()
5537                    .expect("Could not create reader");
5538                reader
5539                    .collect::<Result<Vec<_>, _>>()
5540                    .expect("Could not read")
5541            },
5542        );
5543    }
5544
5545    #[test]
5546    fn test_read_row_group_indices() {
5547        // create a parquet file with 3 row groups, 2 rows each
5548        let array1 = Int64Array::from(vec![1, 2]);
5549        let array2 = Int64Array::from(vec![3, 4]);
5550        let array3 = Int64Array::from(vec![5, 6]);
5551
5552        let batch1 =
5553            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5554        let batch2 =
5555            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5556        let batch3 =
5557            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5558
5559        let mut buffer = Vec::new();
5560        let options = WriterProperties::builder()
5561            .set_max_row_group_row_count(Some(2))
5562            .build();
5563        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5564        writer.write(&batch1).unwrap();
5565        writer.write(&batch2).unwrap();
5566        writer.write(&batch3).unwrap();
5567        writer.close().unwrap();
5568
5569        let file = Bytes::from(buffer);
5570        let row_group_index_field = Arc::new(
5571            Field::new("row_group_index", ArrowDataType::Int64, false)
5572                .with_extension_type(RowGroupIndex),
5573        );
5574
5575        let options = ArrowReaderOptions::new()
5576            .with_virtual_columns(vec![row_group_index_field.clone()])
5577            .unwrap();
5578        let mut arrow_reader =
5579            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5580                .expect("reader builder with virtual columns")
5581                .build()
5582                .expect("reader with virtual columns");
5583
5584        let batch = arrow_reader.next().unwrap().unwrap();
5585
5586        assert_eq!(batch.num_columns(), 2);
5587        assert_eq!(batch.num_rows(), 6);
5588
5589        assert_eq!(
5590            batch
5591                .column(0)
5592                .as_primitive::<types::Int64Type>()
5593                .iter()
5594                .collect::<Vec<_>>(),
5595            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5596        );
5597
5598        assert_eq!(
5599            batch
5600                .column(1)
5601                .as_primitive::<types::Int64Type>()
5602                .iter()
5603                .collect::<Vec<_>>(),
5604            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5605        );
5606    }
5607
5608    #[test]
5609    fn test_read_only_row_group_indices() {
5610        let array1 = Int64Array::from(vec![1, 2, 3]);
5611        let array2 = Int64Array::from(vec![4, 5]);
5612
5613        let batch1 =
5614            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5615        let batch2 =
5616            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5617
5618        let mut buffer = Vec::new();
5619        let options = WriterProperties::builder()
5620            .set_max_row_group_row_count(Some(3))
5621            .build();
5622        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5623        writer.write(&batch1).unwrap();
5624        writer.write(&batch2).unwrap();
5625        writer.close().unwrap();
5626
5627        let file = Bytes::from(buffer);
5628        let row_group_index_field = Arc::new(
5629            Field::new("row_group_index", ArrowDataType::Int64, false)
5630                .with_extension_type(RowGroupIndex),
5631        );
5632
5633        let options = ArrowReaderOptions::new()
5634            .with_virtual_columns(vec![row_group_index_field.clone()])
5635            .unwrap();
5636        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5637        let num_columns = metadata
5638            .metadata
5639            .file_metadata()
5640            .schema_descr()
5641            .num_columns();
5642
5643        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5644            .with_projection(ProjectionMask::none(num_columns))
5645            .build()
5646            .expect("reader with virtual columns only");
5647
5648        let batch = arrow_reader.next().unwrap().unwrap();
5649        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5650
5651        assert_eq!(batch.schema(), schema);
5652        assert_eq!(batch.num_columns(), 1);
5653        assert_eq!(batch.num_rows(), 5);
5654
5655        assert_eq!(
5656            batch
5657                .column(0)
5658                .as_primitive::<types::Int64Type>()
5659                .iter()
5660                .collect::<Vec<_>>(),
5661            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5662        );
5663    }
5664
5665    #[test]
5666    fn test_read_row_group_indices_with_selection() -> Result<()> {
5667        let mut buffer = Vec::new();
5668        let options = WriterProperties::builder()
5669            .set_max_row_group_row_count(Some(10))
5670            .build();
5671
5672        let schema = Arc::new(Schema::new(vec![Field::new(
5673            "value",
5674            ArrowDataType::Int64,
5675            false,
5676        )]));
5677
5678        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5679
5680        // write out 3 batches of 10 rows each
5681        for i in 0..3 {
5682            let start = i * 10;
5683            let array = Int64Array::from_iter_values(start..start + 10);
5684            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5685            writer.write(&batch)?;
5686        }
5687        writer.close()?;
5688
5689        let file = Bytes::from(buffer);
5690        let row_group_index_field = Arc::new(
5691            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5692        );
5693
5694        let options =
5695            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5696
5697        // test row groups are read in reverse order
5698        let arrow_reader =
5699            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5700                .with_row_groups(vec![2, 1, 0])
5701                .build()?;
5702
5703        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5704        let combined = concat_batches(&batches[0].schema(), &batches)?;
5705
5706        let values = combined.column(0).as_primitive::<types::Int64Type>();
5707        let first_val = values.value(0);
5708        let last_val = values.value(combined.num_rows() - 1);
5709        // first row from rg 2
5710        assert_eq!(first_val, 20);
5711        // the last row from rg 0
5712        assert_eq!(last_val, 9);
5713
5714        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5715        assert_eq!(rg_indices.value(0), 2);
5716        assert_eq!(rg_indices.value(10), 1);
5717        assert_eq!(rg_indices.value(20), 0);
5718
5719        Ok(())
5720    }
5721
5722    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5723        use_filter: bool,
5724        test_case: F,
5725    ) where
5726        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5727    {
5728        let seed: u64 = random();
5729        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5730        let mut rng = StdRng::seed_from_u64(seed);
5731
5732        use tempfile::TempDir;
5733        let tempdir = TempDir::new().expect("Could not create temp dir");
5734
5735        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5736
5737        let path = tempdir.path().join("test.parquet");
5738        std::fs::write(&path, bytes).expect("Could not write file");
5739
5740        let mut case = vec![];
5741        let mut remaining = metadata.file_metadata().num_rows();
5742        while remaining > 0 {
5743            let row_count = rng.random_range(1..=remaining);
5744            remaining -= row_count;
5745            case.push(RowSelector {
5746                row_count: row_count as usize,
5747                skip: rng.random_bool(0.5),
5748            });
5749        }
5750
5751        let filter = use_filter.then(|| {
5752            let filter = (0..metadata.file_metadata().num_rows())
5753                .map(|_| rng.random_bool(0.99))
5754                .collect::<Vec<_>>();
5755            let mut filter_offset = 0;
5756            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5757                ProjectionMask::all(),
5758                move |b| {
5759                    let array = BooleanArray::from_iter(
5760                        filter
5761                            .iter()
5762                            .skip(filter_offset)
5763                            .take(b.num_rows())
5764                            .map(|x| Some(*x)),
5765                    );
5766                    filter_offset += b.num_rows();
5767                    Ok(array)
5768                },
5769            ))])
5770        });
5771
5772        let selection = RowSelection::from(case);
5773        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5774
5775        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5776            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5777            return;
5778        }
5779        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5780            .expect("Failed to concatenate");
5781        // assert_eq!(selection.row_count(), actual.num_rows());
5782        let values = actual
5783            .column(0)
5784            .as_primitive::<types::Int64Type>()
5785            .iter()
5786            .collect::<Vec<_>>();
5787        let row_numbers = actual
5788            .column(1)
5789            .as_primitive::<types::Int64Type>()
5790            .iter()
5791            .collect::<Vec<_>>();
5792        assert_eq!(
5793            row_numbers
5794                .into_iter()
5795                .map(|number| number.map(|number| number + 1))
5796                .collect::<Vec<_>>(),
5797            values
5798        );
5799    }
5800
5801    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5802        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5803            "value",
5804            ArrowDataType::Int64,
5805            false,
5806        )])));
5807
5808        let mut buf = Vec::with_capacity(1024);
5809        let mut writer =
5810            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5811
5812        let mut values = 1..=rng.random_range(1..4096);
5813        while !values.is_empty() {
5814            let batch_values = values
5815                .by_ref()
5816                .take(rng.random_range(1..4096))
5817                .collect::<Vec<_>>();
5818            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5819            let batch =
5820                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5821            writer.write(&batch).expect("Could not write batch");
5822            writer.flush().expect("Could not flush");
5823        }
5824        let metadata = writer.close().expect("Could not close writer");
5825
5826        (Bytes::from(buf), metadata)
5827    }
5828}