Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    ///
308    /// See the [blog post on late materialization] for a more technical explanation.
309    ///
310    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
311    ///
312    /// # Example
313    /// ```rust
314    /// # use std::fs::File;
315    /// # use arrow_array::Int32Array;
316    /// # use parquet::arrow::ProjectionMask;
317    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
318    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
319    /// # let testdata = arrow::util::test_util::parquet_test_data();
320    /// # let path = format!("{testdata}/alltypes_plain.parquet");
321    /// # let file = File::open(&path)?;
322    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
323    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
324    /// // Create predicate that evaluates `int_col != 1`.
325    /// // `int_col` column has index 4 (zero based) in the schema
326    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
327    /// // Only the projection columns are passed to the predicate so
328    /// // int_col is column 0 in the predicate
329    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
330    ///     let int_col = batch.column(0);
331    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
332    /// });
333    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
334    /// // The filter will be invoked during the reading process
335    /// let reader = builder.with_row_filter(row_filter).build()?;
336    /// # for b in reader { let _ = b?; }
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn with_row_filter(self, filter: RowFilter) -> Self {
341        Self {
342            filter: Some(filter),
343            ..self
344        }
345    }
346
347    /// Provide a limit to the number of rows to be read
348    ///
349    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
350    /// allowing it to limit the final set of rows decoded after any pushed down predicates
351    ///
352    /// It is recommended to enable reading the page index if using this functionality, to allow
353    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
354    pub fn with_limit(self, limit: usize) -> Self {
355        Self {
356            limit: Some(limit),
357            ..self
358        }
359    }
360
361    /// Provide an offset to skip over the given number of rows
362    ///
363    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
364    /// allowing it to skip rows after any pushed down predicates
365    ///
366    /// It is recommended to enable reading the page index if using this functionality, to allow
367    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
368    pub fn with_offset(self, offset: usize) -> Self {
369        Self {
370            offset: Some(offset),
371            ..self
372        }
373    }
374
375    /// Specify metrics collection during reading
376    ///
377    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
378    /// clone of the provided metrics to the builder.
379    ///
380    /// For example:
381    ///
382    /// ```rust
383    /// # use std::sync::Arc;
384    /// # use bytes::Bytes;
385    /// # use arrow_array::{Int32Array, RecordBatch};
386    /// # use arrow_schema::{DataType, Field, Schema};
387    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
388    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
389    /// # use parquet::arrow::ArrowWriter;
390    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// # let file = Bytes::from(file);
397    /// // Create metrics object to pass into the reader
398    /// let metrics = ArrowReaderMetrics::enabled();
399    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
400    ///   // Configure the builder to use the metrics by passing a clone
401    ///   .with_metrics(metrics.clone())
402    ///   // Build the reader
403    ///   .build().unwrap();
404    /// // .. read data from the reader ..
405    ///
406    /// // check the metrics
407    /// assert!(metrics.records_read_from_inner().is_some());
408    /// ```
409    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410        Self { metrics, ..self }
411    }
412
413    /// Set the maximum size (per row group) of the predicate cache in bytes for
414    /// the async decoder.
415    ///
416    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
417    /// unlimited cache size.
418    ///
419    /// This cache is used to store decoded arrays that are used in
420    /// predicate evaluation ([`Self::with_row_filter`]).
421    ///
422    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
423    /// [this ticket] for more details and alternatives.
424    ///
425    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
426    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
427    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428        Self {
429            max_predicate_cache_size,
430            ..self
431        }
432    }
433}
434
435/// Options that control how [`ParquetMetaData`] is read when constructing
436/// an Arrow reader.
437///
438/// To use these options, pass them to one of the following methods:
439/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
440/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
441///
442/// For fine-grained control over metadata loading, use
443/// [`ArrowReaderMetadata::load`] to load metadata with these options,
444///
445/// See [`ArrowReaderBuilder`] for how to configure how the column data
446/// is then read from the file, including projection and filter pushdown
447///
448/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
449#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451    /// Should the reader strip any user defined metadata from the Arrow schema
452    skip_arrow_metadata: bool,
453    /// If provided, used as the schema hint when determining the Arrow schema,
454    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
455    ///
456    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
457    supplied_schema: Option<SchemaRef>,
458
459    pub(crate) column_index: PageIndexPolicy,
460    pub(crate) offset_index: PageIndexPolicy,
461
462    /// Options to control reading of Parquet metadata
463    metadata_options: ParquetMetaDataOptions,
464    /// If encryption is enabled, the file decryption properties can be provided
465    #[cfg(feature = "encryption")]
466    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468    virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472    /// Create a new [`ArrowReaderOptions`] with the default settings
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Skip decoding the embedded arrow metadata (defaults to `false`)
478    ///
479    /// Parquet files generated by some writers may contain embedded arrow
480    /// schema and metadata.
481    /// This may not be correct or compatible with your system,
482    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
483    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484        Self {
485            skip_arrow_metadata,
486            ..self
487        }
488    }
489
490    /// Provide a schema hint to use when reading the Parquet file.
491    ///
492    /// If provided, this schema takes precedence over any arrow schema embedded
493    /// in the metadata (see the [`arrow`] documentation for more details).
494    ///
495    /// If the provided schema is not compatible with the data stored in the
496    /// parquet file schema, an error will be returned when constructing the
497    /// builder.
498    ///
499    /// This option is only required if you want to explicitly control the
500    /// conversion of Parquet types to Arrow types, such as casting a column to
501    /// a different type. For example, if you wanted to read an Int64 in
502    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
503    ///
504    /// [`arrow`]: crate::arrow
505    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
506    ///
507    /// # Notes
508    ///
509    /// The provided schema must have the same number of columns as the parquet schema and
510    /// the column names must be the same.
511    ///
512    /// # Example
513    /// ```
514    /// # use std::sync::Arc;
515    /// # use bytes::Bytes;
516    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
517    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
518    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
519    /// # use parquet::arrow::ArrowWriter;
520    /// // Write data - schema is inferred from the data to be Int32
521    /// let mut file = Vec::new();
522    /// let batch = RecordBatch::try_from_iter(vec![
523    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
524    /// ]).unwrap();
525    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
526    /// writer.write(&batch).unwrap();
527    /// writer.close().unwrap();
528    /// let file = Bytes::from(file);
529    ///
530    /// // Read the file back.
531    /// // Supply a schema that interprets the Int32 column as a Timestamp.
532    /// let supplied_schema = Arc::new(Schema::new(vec![
533    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
534    /// ]));
535    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
536    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
537    ///     file.clone(),
538    ///     options
539    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
540    ///
541    /// // Create the reader and read the data using the supplied schema.
542    /// let mut reader = builder.build().unwrap();
543    /// let _batch = reader.next().unwrap().unwrap();
544    /// ```
545    ///
546    /// # Example: Preserving Dictionary Encoding
547    ///
548    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
549    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
550    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
551    ///
552    /// ```
553    /// # use std::sync::Arc;
554    /// # use bytes::Bytes;
555    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
556    /// # use arrow_schema::{DataType, Field, Schema};
557    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
558    /// # use parquet::arrow::ArrowWriter;
559    /// // Write a Parquet file with string data
560    /// let mut file = Vec::new();
561    /// let schema = Arc::new(Schema::new(vec![
562    ///     Field::new("city", DataType::Utf8, false)
563    /// ]));
564    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
565    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
566    ///
567    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
568    /// writer.write(&batch).unwrap();
569    /// writer.close().unwrap();
570    /// let file = Bytes::from(file);
571    ///
572    /// // Read the file back, requesting dictionary encoding preservation
573    /// let dict_schema = Arc::new(Schema::new(vec![
574    ///     Field::new("city", DataType::Dictionary(
575    ///         Box::new(DataType::Int32),
576    ///         Box::new(DataType::Utf8)
577    ///     ), false)
578    /// ]));
579    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
580    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
581    ///     file.clone(),
582    ///     options
583    /// ).unwrap();
584    ///
585    /// let mut reader = builder.build().unwrap();
586    /// let batch = reader.next().unwrap().unwrap();
587    ///
588    /// // The column is now a DictionaryArray
589    /// assert!(matches!(
590    ///     batch.column(0).data_type(),
591    ///     DataType::Dictionary(_, _)
592    /// ));
593    /// ```
594    ///
595    /// **Note**: Dictionary encoding preservation works best when:
596    /// 1. The original column was dictionary encoded (the default for string columns)
597    /// 2. There are a small number of distinct values
598    pub fn with_schema(self, schema: SchemaRef) -> Self {
599        Self {
600            supplied_schema: Some(schema),
601            skip_arrow_metadata: true,
602            ..self
603        }
604    }
605
606    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
608    ///
609    /// The `PageIndex` can be used to push down predicates to the parquet scan,
610    /// potentially eliminating unnecessary IO, by some query engines.
611    ///
612    /// If this is enabled, [`ParquetMetaData::column_index`] and
613    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
614    /// information is present in the file.
615    ///
616    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
617    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
618    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
619    pub fn with_page_index(self, page_index: bool) -> Self {
620        self.with_page_index_policy(PageIndexPolicy::from(page_index))
621    }
622
623    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
624    ///
625    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
626    /// This method sets the same policy for both. For fine-grained control, use
627    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
628    ///
629    /// See [`Self::with_page_index`] for more details on page indexes.
630    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631        self.with_column_index_policy(policy)
632            .with_offset_index_policy(policy)
633    }
634
635    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
636    ///
637    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
638    /// for predicate pushdown and page-level pruning.
639    ///
640    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
641    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642        self.column_index = policy;
643        self
644    }
645
646    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
647    ///
648    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
649    /// efficient page-level skipping and random access within column chunks.
650    ///
651    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
652    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653        self.offset_index = policy;
654        self
655    }
656
657    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
658    /// footer will be skipped.
659    ///
660    /// This can be used to avoid reparsing the schema from the file when it is
661    /// already known.
662    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663        self.metadata_options.set_schema(schema);
664        self
665    }
666
667    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
668    /// (defaults to `false`).
669    ///
670    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
671    /// might be desirable.
672    ///
673    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
674    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
675    /// [`encoding_stats`]:
676    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
677    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678        self.metadata_options.set_encoding_stats_as_mask(val);
679        self
680    }
681
682    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
683    ///
684    /// [`encoding_stats`]:
685    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
686    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687        self.metadata_options.set_encoding_stats_policy(policy);
688        self
689    }
690
691    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
692    ///
693    /// [`statistics`]:
694    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
695    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696        self.metadata_options.set_column_stats_policy(policy);
697        self
698    }
699
700    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
701    ///
702    /// [`size_statistics`]:
703    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
704    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705        self.metadata_options.set_size_stats_policy(policy);
706        self
707    }
708
709    /// Provide the file decryption properties to use when reading encrypted parquet files.
710    ///
711    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
712    #[cfg(feature = "encryption")]
713    pub fn with_file_decryption_properties(
714        self,
715        file_decryption_properties: Arc<FileDecryptionProperties>,
716    ) -> Self {
717        Self {
718            file_decryption_properties: Some(file_decryption_properties),
719            ..self
720        }
721    }
722
723    /// Include virtual columns in the output.
724    ///
725    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
726    ///
727    /// # Example
728    /// ```
729    /// # use std::sync::Arc;
730    /// # use bytes::Bytes;
731    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
732    /// # use arrow_schema::{DataType, Field, Schema};
733    /// # use parquet::arrow::{ArrowWriter, RowNumber};
734    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
735    /// #
736    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
737    /// // Create a simple record batch with some data
738    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
739    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
740    ///
741    /// // Write the batch to an in-memory buffer
742    /// let mut file = Vec::new();
743    /// let mut writer = ArrowWriter::try_new(
744    ///     &mut file,
745    ///     batch.schema(),
746    ///     None
747    /// )?;
748    /// writer.write(&batch)?;
749    /// writer.close()?;
750    /// let file = Bytes::from(file);
751    ///
752    /// // Create a virtual column for row numbers
753    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
754    ///     .with_extension_type(RowNumber));
755    ///
756    /// // Configure options with virtual columns
757    /// let options = ArrowReaderOptions::new()
758    ///     .with_virtual_columns(vec![row_number_field])?;
759    ///
760    /// // Create a reader with the options
761    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
762    ///     file,
763    ///     options
764    /// )?
765    /// .build()?;
766    ///
767    /// // Read the batch - it will include both the original column and the virtual row_number column
768    /// let result_batch = reader.next().unwrap()?;
769    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
770    /// assert_eq!(result_batch.num_rows(), 3);
771    /// #
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776        // Validate that all fields are virtual columns
777        for field in &virtual_columns {
778            if !is_virtual_column(field) {
779                return Err(ParquetError::General(format!(
780                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781                    field.name()
782                )));
783            }
784        }
785        Ok(Self {
786            virtual_columns,
787            ..self
788        })
789    }
790
791    #[deprecated(
792        since = "57.2.0",
793        note = "Use `column_index_policy` or `offset_index_policy` instead"
794    )]
795    /// Returns whether page index reading is enabled.
796    ///
797    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
798    ///
799    /// This can be set via [`with_page_index`][Self::with_page_index] or
800    /// [`with_page_index_policy`][Self::with_page_index_policy].
801    pub fn page_index(&self) -> bool {
802        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803    }
804
805    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
806    ///
807    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
808    /// or [`with_page_index_policy`][Self::with_page_index_policy].
809    pub fn offset_index_policy(&self) -> PageIndexPolicy {
810        self.offset_index
811    }
812
813    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
814    ///
815    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
816    /// or [`with_page_index_policy`][Self::with_page_index_policy].
817    pub fn column_index_policy(&self) -> PageIndexPolicy {
818        self.column_index
819    }
820
821    /// Retrieve the currently set metadata decoding options.
822    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823        &self.metadata_options
824    }
825
826    /// Retrieve the currently set file decryption properties.
827    ///
828    /// This can be set via
829    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
830    #[cfg(feature = "encryption")]
831    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832        self.file_decryption_properties.as_ref()
833    }
834}
835
836/// The metadata necessary to construct a [`ArrowReaderBuilder`]
837///
838/// Note this structure is cheaply clone-able as it consists of several arcs.
839///
840/// This structure allows
841///
842/// 1. Loading metadata for a file once and then using that same metadata to
843///    construct multiple separate readers, for example, to distribute readers
844///    across multiple threads
845///
846/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
847///    from the file each time a reader is constructed.
848///
849/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
850#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852    /// The Parquet Metadata, if known aprior
853    pub(crate) metadata: Arc<ParquetMetaData>,
854    /// The Arrow Schema
855    pub(crate) schema: SchemaRef,
856    /// The Parquet schema (root field)
857    pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
862    /// and [`ChunkReader`]
863    ///
864    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
865    /// example of how this can be used
866    ///
867    /// # Notes
868    ///
869    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
870    /// `Self::metadata` is missing the page index, this function will attempt
871    /// to load the page index by making an object store request.
872    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873        let metadata = ParquetMetaDataReader::new()
874            .with_column_index_policy(options.column_index)
875            .with_offset_index_policy(options.offset_index)
876            .with_metadata_options(Some(options.metadata_options.clone()));
877        #[cfg(feature = "encryption")]
878        let metadata = metadata.with_decryption_properties(
879            options.file_decryption_properties.as_ref().map(Arc::clone),
880        );
881        let metadata = metadata.parse_and_finish(reader)?;
882        Self::try_new(Arc::new(metadata), options)
883    }
884
885    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
886    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
887    ///
888    /// # Notes
889    ///
890    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
891    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
892    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893        match options.supplied_schema {
894            Some(supplied_schema) => Self::with_supplied_schema(
895                metadata,
896                supplied_schema.clone(),
897                &options.virtual_columns,
898            ),
899            None => {
900                let kv_metadata = match options.skip_arrow_metadata {
901                    true => None,
902                    false => metadata.file_metadata().key_value_metadata(),
903                };
904
905                let (schema, fields) = parquet_to_arrow_schema_and_fields(
906                    metadata.file_metadata().schema_descr(),
907                    ProjectionMask::all(),
908                    kv_metadata,
909                    &options.virtual_columns,
910                )?;
911
912                Ok(Self {
913                    metadata,
914                    schema: Arc::new(schema),
915                    fields: fields.map(Arc::new),
916                })
917            }
918        }
919    }
920
921    fn with_supplied_schema(
922        metadata: Arc<ParquetMetaData>,
923        supplied_schema: SchemaRef,
924        virtual_columns: &[FieldRef],
925    ) -> Result<Self> {
926        let parquet_schema = metadata.file_metadata().schema_descr();
927        let field_levels = parquet_to_arrow_field_levels_with_virtual(
928            parquet_schema,
929            ProjectionMask::all(),
930            Some(supplied_schema.fields()),
931            virtual_columns,
932        )?;
933        let fields = field_levels.fields;
934        let inferred_len = fields.len();
935        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936        // Ensure the supplied schema has the same number of columns as the parquet schema.
937        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
938        // different lengths, but we check here to be safe.
939        if inferred_len != supplied_len {
940            return Err(arrow_err!(format!(
941                "Incompatible supplied Arrow schema: expected {} columns received {}",
942                inferred_len, supplied_len
943            )));
944        }
945
946        let mut errors = Vec::new();
947
948        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950        for (field1, field2) in field_iter {
951            if field1.data_type() != field2.data_type() {
952                errors.push(format!(
953                    "data type mismatch for field {}: requested {} but found {}",
954                    field1.name(),
955                    field1.data_type(),
956                    field2.data_type()
957                ));
958            }
959            if field1.is_nullable() != field2.is_nullable() {
960                errors.push(format!(
961                    "nullability mismatch for field {}: expected {:?} but found {:?}",
962                    field1.name(),
963                    field1.is_nullable(),
964                    field2.is_nullable()
965                ));
966            }
967            if field1.metadata() != field2.metadata() {
968                errors.push(format!(
969                    "metadata mismatch for field {}: expected {:?} but found {:?}",
970                    field1.name(),
971                    field1.metadata(),
972                    field2.metadata()
973                ));
974            }
975        }
976
977        if !errors.is_empty() {
978            let message = errors.join(", ");
979            return Err(ParquetError::ArrowError(format!(
980                "Incompatible supplied Arrow schema: {message}",
981            )));
982        }
983
984        Ok(Self {
985            metadata,
986            schema: supplied_schema,
987            fields: field_levels.levels.map(Arc::new),
988        })
989    }
990
991    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
992    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993        &self.metadata
994    }
995
996    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
997    pub fn parquet_schema(&self) -> &SchemaDescriptor {
998        self.metadata.file_metadata().schema_descr()
999    }
1000
1001    /// Returns the arrow [`SchemaRef`] for this parquet file
1002    pub fn schema(&self) -> &SchemaRef {
1003        &self.schema
1004    }
1005}
1006
1007#[doc(hidden)]
1008// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1009pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013        f.debug_tuple("SyncReader").field(&self.0).finish()
1014    }
1015}
1016
1017/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1018///
1019/// # See Also
1020/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1021/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1022/// * [`ArrowReaderBuilder`] for additional member functions
1023pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1027    ///
1028    /// ```
1029    /// # use std::sync::Arc;
1030    /// # use bytes::Bytes;
1031    /// # use arrow_array::{Int32Array, RecordBatch};
1032    /// # use arrow_schema::{DataType, Field, Schema};
1033    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1034    /// # use parquet::arrow::ArrowWriter;
1035    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1036    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1037    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1038    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1039    /// # writer.write(&batch).unwrap();
1040    /// # writer.close().unwrap();
1041    /// # let file = Bytes::from(file);
1042    /// // Build the reader from anything that implements `ChunkReader`
1043    /// // such as a `File`, or `Bytes`
1044    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045    /// // The builder has access to ParquetMetaData such
1046    /// // as the number and layout of row groups
1047    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1048    /// // Call build to create the reader
1049    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1050    /// // Read data
1051    /// while let Some(batch) = reader.next().transpose()? {
1052    ///     println!("Read {} rows", batch.num_rows());
1053    /// }
1054    /// # Ok::<(), parquet::errors::ParquetError>(())
1055    /// ```
1056    pub fn try_new(reader: T) -> Result<Self> {
1057        Self::try_new_with_options(reader, Default::default())
1058    }
1059
1060    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1061    ///
1062    /// Use this method if you want to control the options for reading the
1063    /// [`ParquetMetaData`]
1064    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066        Ok(Self::new_with_metadata(reader, metadata))
1067    }
1068
1069    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1070    ///
1071    /// Use this method if you already have [`ParquetMetaData`] for a file.
1072    /// This interface allows:
1073    ///
1074    /// 1. Loading metadata once and using it to create multiple builders with
1075    ///    potentially different settings or run on different threads
1076    ///
1077    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1078    ///    file each time a reader is constructed.
1079    ///
1080    /// See the docs on [`ArrowReaderMetadata`] for more details
1081    ///
1082    /// # Example
1083    /// ```
1084    /// # use std::fs::metadata;
1085    /// # use std::sync::Arc;
1086    /// # use bytes::Bytes;
1087    /// # use arrow_array::{Int32Array, RecordBatch};
1088    /// # use arrow_schema::{DataType, Field, Schema};
1089    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1090    /// # use parquet::arrow::ArrowWriter;
1091    /// #
1092    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1093    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1094    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1095    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1096    /// # writer.write(&batch).unwrap();
1097    /// # writer.close().unwrap();
1098    /// # let file = Bytes::from(file);
1099    /// #
1100    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1101    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1102    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1103    ///
1104    /// // Should be able to read from both in parallel
1105    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1106    /// ```
1107    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108        Self::new_builder(SyncReader(input), metadata)
1109    }
1110
1111    /// Read bloom filter for a column in a row group
1112    ///
1113    /// Returns `None` if the column does not have a bloom filter
1114    ///
1115    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1116    pub fn get_row_group_column_bloom_filter(
1117        &self,
1118        row_group_idx: usize,
1119        column_idx: usize,
1120    ) -> Result<Option<Sbbf>> {
1121        let metadata = self.metadata.row_group(row_group_idx);
1122        let column_metadata = metadata.column(column_idx);
1123
1124        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125            offset
1126                .try_into()
1127                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128        } else {
1129            return Ok(None);
1130        };
1131
1132        let buffer = match column_metadata.bloom_filter_length() {
1133            Some(length) => self.input.0.get_bytes(offset, length as usize),
1134            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135        }?;
1136
1137        let (header, bitset_offset) =
1138            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140        match header.algorithm {
1141            BloomFilterAlgorithm::BLOCK => {
1142                // this match exists to future proof the singleton algorithm enum
1143            }
1144        }
1145        match header.compression {
1146            BloomFilterCompression::UNCOMPRESSED => {
1147                // this match exists to future proof the singleton compression enum
1148            }
1149        }
1150        match header.hash {
1151            BloomFilterHash::XXHASH => {
1152                // this match exists to future proof the singleton hash enum
1153            }
1154        }
1155
1156        let bitset = match column_metadata.bloom_filter_length() {
1157            Some(_) => buffer.slice(
1158                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159                    - TryInto::<usize>::try_into(offset).unwrap())..,
1160            ),
1161            None => {
1162                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163                    ParquetError::General("Bloom filter length is invalid".to_string())
1164                })?;
1165                self.input.0.get_bytes(bitset_offset, bitset_length)?
1166            }
1167        };
1168        Ok(Some(Sbbf::new(&bitset)))
1169    }
1170
1171    /// Build a [`ParquetRecordBatchReader`]
1172    ///
1173    /// Note: this will eagerly evaluate any `RowFilter` before returning
1174    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175        let Self {
1176            input,
1177            metadata,
1178            schema: _,
1179            fields,
1180            batch_size,
1181            row_groups,
1182            projection,
1183            mut filter,
1184            selection,
1185            row_selection_policy,
1186            limit,
1187            offset,
1188            metrics,
1189            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1190            max_predicate_cache_size: _,
1191        } = self;
1192
1193        // Try to avoid allocate large buffer
1194        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198        let reader = ReaderRowGroups {
1199            reader: Arc::new(input.0),
1200            metadata,
1201            row_groups,
1202        };
1203
1204        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205            .with_selection(selection)
1206            .with_row_selection_policy(row_selection_policy);
1207
1208        // Update selection based on any filters
1209        if let Some(filter) = filter.as_mut() {
1210            for predicate in filter.predicates.iter_mut() {
1211                // break early if we have ruled out all rows
1212                if !plan_builder.selects_any() {
1213                    break;
1214                }
1215
1216                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217                    .with_parquet_metadata(&reader.metadata)
1218                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221            }
1222        }
1223
1224        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225            .with_parquet_metadata(&reader.metadata)
1226            .build_array_reader(fields.as_deref(), &projection)?;
1227
1228        let read_plan = plan_builder
1229            .limited(reader.num_rows())
1230            .with_offset(offset)
1231            .with_limit(limit)
1232            .build_limited()
1233            .build();
1234
1235        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236    }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240    reader: Arc<T>,
1241
1242    metadata: Arc<ParquetMetaData>,
1243    /// Optional list of row group indices to scan
1244    row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248    fn num_rows(&self) -> usize {
1249        let meta = self.metadata.row_groups();
1250        self.row_groups
1251            .iter()
1252            .map(|x| meta[*x].num_rows() as usize)
1253            .sum()
1254    }
1255
1256    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257        Ok(Box::new(ReaderPageIterator {
1258            column_idx: i,
1259            reader: self.reader.clone(),
1260            metadata: self.metadata.clone(),
1261            row_groups: self.row_groups.clone().into_iter(),
1262        }))
1263    }
1264
1265    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266        Box::new(
1267            self.row_groups
1268                .iter()
1269                .map(move |i| self.metadata.row_group(*i)),
1270        )
1271    }
1272
1273    fn metadata(&self) -> &ParquetMetaData {
1274        self.metadata.as_ref()
1275    }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279    reader: Arc<T>,
1280    column_idx: usize,
1281    row_groups: std::vec::IntoIter<usize>,
1282    metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286    /// Return the next SerializedPageReader
1287    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288        let rg = self.metadata.row_group(rg_idx);
1289        let column_chunk_metadata = rg.column(self.column_idx);
1290        let offset_index = self.metadata.offset_index();
1291        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1292        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1293        let page_locations = offset_index
1294            .filter(|i| !i[rg_idx].is_empty())
1295            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296        let total_rows = rg.num_rows() as usize;
1297        let reader = self.reader.clone();
1298
1299        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300            .add_crypto_context(
1301                rg_idx,
1302                self.column_idx,
1303                self.metadata.as_ref(),
1304                column_chunk_metadata,
1305            )
1306    }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310    type Item = Result<Box<dyn PageReader>>;
1311
1312    fn next(&mut self) -> Option<Self::Item> {
1313        let rg_idx = self.row_groups.next()?;
1314        let page_reader = self
1315            .next_page_reader(rg_idx)
1316            .map(|page_reader| Box::new(page_reader) as _);
1317        Some(page_reader)
1318    }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323/// Reads Parquet data as Arrow [`RecordBatch`]es
1324///
1325/// This struct implements the [`RecordBatchReader`] trait and is an
1326/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1327///
1328/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1329///
1330/// Created by [`ParquetRecordBatchReaderBuilder`]
1331///
1332/// [`Bytes`]: bytes::Bytes
1333pub struct ParquetRecordBatchReader {
1334    array_reader: Box<dyn ArrayReader>,
1335    schema: SchemaRef,
1336    read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341        f.debug_struct("ParquetRecordBatchReader")
1342            .field("array_reader", &"...")
1343            .field("schema", &self.schema)
1344            .field("read_plan", &self.read_plan)
1345            .finish()
1346    }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350    type Item = Result<RecordBatch, ArrowError>;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        self.next_inner()
1354            .map_err(|arrow_err| arrow_err.into())
1355            .transpose()
1356    }
1357}
1358
1359impl ParquetRecordBatchReader {
1360    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1361    /// has reached the end of the file.
1362    ///
1363    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1364    /// simplify error handling with `?`
1365    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366        let mut read_records = 0;
1367        let batch_size = self.batch_size();
1368        if batch_size == 0 {
1369            return Ok(None);
1370        }
1371        match self.read_plan.row_selection_cursor_mut() {
1372            RowSelectionCursor::Mask(mask_cursor) => {
1373                // Stream the record batch reader using contiguous segments of the selection
1374                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1375                while !mask_cursor.is_empty() {
1376                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377                        return Ok(None);
1378                    };
1379
1380                    if mask_chunk.initial_skip > 0 {
1381                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382                        if skipped != mask_chunk.initial_skip {
1383                            return Err(general_err!(
1384                                "failed to skip rows, expected {}, got {}",
1385                                mask_chunk.initial_skip,
1386                                skipped
1387                            ));
1388                        }
1389                    }
1390
1391                    if mask_chunk.chunk_rows == 0 {
1392                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393                            return Ok(None);
1394                        }
1395                        continue;
1396                    }
1397
1398                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401                    if read == 0 {
1402                        return Err(general_err!(
1403                            "reached end of column while expecting {} rows",
1404                            mask_chunk.chunk_rows
1405                        ));
1406                    }
1407                    if read != mask_chunk.chunk_rows {
1408                        return Err(general_err!(
1409                            "insufficient rows read from array reader - expected {}, got {}",
1410                            mask_chunk.chunk_rows,
1411                            read
1412                        ));
1413                    }
1414
1415                    let array = self.array_reader.consume_batch()?;
1416                    // The column reader exposes the projection as a struct array; convert this
1417                    // into a record batch before applying the boolean filter mask.
1418                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1419                        ArrowError::ParquetError(
1420                            "Struct array reader should return struct array".to_string(),
1421                        )
1422                    })?;
1423
1424                    let filtered_batch =
1425                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428                        return Err(general_err!(
1429                            "filtered rows mismatch selection - expected {}, got {}",
1430                            mask_chunk.selected_rows,
1431                            filtered_batch.num_rows()
1432                        ));
1433                    }
1434
1435                    if filtered_batch.num_rows() == 0 {
1436                        continue;
1437                    }
1438
1439                    return Ok(Some(filtered_batch));
1440                }
1441            }
1442            RowSelectionCursor::Selectors(selectors_cursor) => {
1443                while read_records < batch_size && !selectors_cursor.is_empty() {
1444                    let front = selectors_cursor.next_selector();
1445                    if front.skip {
1446                        let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448                        if skipped != front.row_count {
1449                            return Err(general_err!(
1450                                "failed to skip rows, expected {}, got {}",
1451                                front.row_count,
1452                                skipped
1453                            ));
1454                        }
1455                        continue;
1456                    }
1457
1458                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1459                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1460                    if front.row_count == 0 {
1461                        continue;
1462                    }
1463
1464                    // try to read record
1465                    let need_read = batch_size - read_records;
1466                    let to_read = match front.row_count.checked_sub(need_read) {
1467                        Some(remaining) if remaining != 0 => {
1468                            // if page row count less than batch_size we must set batch size to page row count.
1469                            // add check avoid dead loop
1470                            selectors_cursor.return_selector(RowSelector::select(remaining));
1471                            need_read
1472                        }
1473                        _ => front.row_count,
1474                    };
1475                    match self.array_reader.read_records(to_read)? {
1476                        0 => break,
1477                        rec => read_records += rec,
1478                    };
1479                }
1480            }
1481            RowSelectionCursor::All => {
1482                self.array_reader.read_records(batch_size)?;
1483            }
1484        };
1485
1486        let array = self.array_reader.consume_batch()?;
1487        let struct_array = array.as_struct_opt().ok_or_else(|| {
1488            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489        })?;
1490
1491        Ok(if struct_array.len() > 0 {
1492            Some(RecordBatch::from(struct_array))
1493        } else {
1494            None
1495        })
1496    }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1501    ///
1502    /// Note that the schema metadata will be stripped here. See
1503    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1504    fn schema(&self) -> SchemaRef {
1505        self.schema.clone()
1506    }
1507}
1508
1509impl ParquetRecordBatchReader {
1510    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1511    ///
1512    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1513    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514        ParquetRecordBatchReaderBuilder::try_new(reader)?
1515            .with_batch_size(batch_size)
1516            .build()
1517    }
1518
1519    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1520    ///
1521    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1522    /// higher-level interface for reading parquet data from a file
1523    pub fn try_new_with_row_groups(
1524        levels: &FieldLevels,
1525        row_groups: &dyn RowGroups,
1526        batch_size: usize,
1527        selection: Option<RowSelection>,
1528    ) -> Result<Self> {
1529        // note metrics are not supported in this API
1530        let metrics = ArrowReaderMetrics::disabled();
1531        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532            .with_parquet_metadata(row_groups.metadata())
1533            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535        let read_plan = ReadPlanBuilder::new(batch_size)
1536            .with_selection(selection)
1537            .build();
1538
1539        Ok(Self {
1540            array_reader,
1541            schema: Arc::new(Schema::new(levels.fields.clone())),
1542            read_plan,
1543        })
1544    }
1545
1546    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1547    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1548    /// all rows will be returned
1549    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550        let schema = match array_reader.get_data_type() {
1551            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552            _ => unreachable!("Struct array reader's data type is not struct!"),
1553        };
1554
1555        Self {
1556            array_reader,
1557            schema: Arc::new(schema),
1558            read_plan,
1559        }
1560    }
1561
1562    #[inline(always)]
1563    pub(crate) fn batch_size(&self) -> usize {
1564        self.read_plan.batch_size()
1565    }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570    use std::cmp::min;
1571    use std::collections::{HashMap, VecDeque};
1572    use std::fmt::Formatter;
1573    use std::fs::File;
1574    use std::io::Seek;
1575    use std::path::PathBuf;
1576    use std::sync::Arc;
1577
1578    use rand::rngs::StdRng;
1579    use rand::{Rng, RngCore, SeedableRng, random, rng};
1580    use tempfile::tempfile;
1581
1582    use crate::arrow::arrow_reader::{
1583        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1584        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1585        RowSelector,
1586    };
1587    use crate::arrow::schema::{
1588        add_encoded_arrow_schema_to_metadata,
1589        virtual_type::{RowGroupIndex, RowNumber},
1590    };
1591    use crate::arrow::{ArrowWriter, ProjectionMask};
1592    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1593    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1594    use crate::data_type::{
1595        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1596        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1597    };
1598    use crate::errors::Result;
1599    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1600    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1601    use crate::file::writer::SerializedFileWriter;
1602    use crate::schema::parser::parse_message_type;
1603    use crate::schema::types::{Type, TypePtr};
1604    use crate::util::test_common::rand_gen::RandGen;
1605    use arrow_array::builder::*;
1606    use arrow_array::cast::AsArray;
1607    use arrow_array::types::{
1608        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1609        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1610        Time64MicrosecondType,
1611    };
1612    use arrow_array::*;
1613    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1614    use arrow_data::{ArrayData, ArrayDataBuilder};
1615    use arrow_schema::{
1616        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1617    };
1618    use arrow_select::concat::concat_batches;
1619    use bytes::Bytes;
1620    use half::f16;
1621    use num_traits::PrimInt;
1622
1623    #[test]
1624    fn test_arrow_reader_all_columns() {
1625        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1626
1627        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1628        let original_schema = Arc::clone(builder.schema());
1629        let reader = builder.build().unwrap();
1630
1631        // Verify that the schema was correctly parsed
1632        assert_eq!(original_schema.fields(), reader.schema().fields());
1633    }
1634
1635    #[test]
1636    fn test_reuse_schema() {
1637        let file = get_test_file("parquet/alltypes-java.parquet");
1638
1639        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1640        let expected = builder.metadata;
1641        let schema = expected.file_metadata().schema_descr_ptr();
1642
1643        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1644        let builder =
1645            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1646
1647        // Verify that the metadata matches
1648        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1649    }
1650
1651    #[test]
1652    fn test_page_encoding_stats_mask() {
1653        let testdata = arrow::util::test_util::parquet_test_data();
1654        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1655        let file = File::open(path).unwrap();
1656
1657        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1658        let builder =
1659            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1660
1661        let row_group_metadata = builder.metadata.row_group(0);
1662
1663        // test page encoding stats
1664        let page_encoding_stats = row_group_metadata
1665            .column(0)
1666            .page_encoding_stats_mask()
1667            .unwrap();
1668        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1669        let page_encoding_stats = row_group_metadata
1670            .column(2)
1671            .page_encoding_stats_mask()
1672            .unwrap();
1673        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1674    }
1675
1676    #[test]
1677    fn test_stats_stats_skipped() {
1678        let testdata = arrow::util::test_util::parquet_test_data();
1679        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1680        let file = File::open(path).unwrap();
1681
1682        // test skipping all
1683        let arrow_options = ArrowReaderOptions::new()
1684            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1685            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1686        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1687            file.try_clone().unwrap(),
1688            arrow_options,
1689        )
1690        .unwrap();
1691
1692        let row_group_metadata = builder.metadata.row_group(0);
1693        for column in row_group_metadata.columns() {
1694            assert!(column.page_encoding_stats().is_none());
1695            assert!(column.page_encoding_stats_mask().is_none());
1696            assert!(column.statistics().is_none());
1697        }
1698
1699        // test skipping all but one column and converting to mask
1700        let arrow_options = ArrowReaderOptions::new()
1701            .with_encoding_stats_as_mask(true)
1702            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1703            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1704        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1705            file.try_clone().unwrap(),
1706            arrow_options,
1707        )
1708        .unwrap();
1709
1710        let row_group_metadata = builder.metadata.row_group(0);
1711        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1712            assert!(column.page_encoding_stats().is_none());
1713            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1714            assert_eq!(column.statistics().is_some(), idx == 0);
1715        }
1716    }
1717
1718    #[test]
1719    fn test_size_stats_stats_skipped() {
1720        let testdata = arrow::util::test_util::parquet_test_data();
1721        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1722        let file = File::open(path).unwrap();
1723
1724        // test skipping all
1725        let arrow_options =
1726            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1727        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1728            file.try_clone().unwrap(),
1729            arrow_options,
1730        )
1731        .unwrap();
1732
1733        let row_group_metadata = builder.metadata.row_group(0);
1734        for column in row_group_metadata.columns() {
1735            assert!(column.repetition_level_histogram().is_none());
1736            assert!(column.definition_level_histogram().is_none());
1737            assert!(column.unencoded_byte_array_data_bytes().is_none());
1738        }
1739
1740        // test skipping all but one column and converting to mask
1741        let arrow_options = ArrowReaderOptions::new()
1742            .with_encoding_stats_as_mask(true)
1743            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1744        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1745            file.try_clone().unwrap(),
1746            arrow_options,
1747        )
1748        .unwrap();
1749
1750        let row_group_metadata = builder.metadata.row_group(0);
1751        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1752            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1753            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1754            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1755        }
1756    }
1757
1758    #[test]
1759    fn test_arrow_reader_single_column() {
1760        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1761
1762        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1763        let original_schema = Arc::clone(builder.schema());
1764
1765        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1766        let reader = builder.with_projection(mask).build().unwrap();
1767
1768        // Verify that the schema was correctly parsed
1769        assert_eq!(1, reader.schema().fields().len());
1770        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1771    }
1772
1773    #[test]
1774    fn test_arrow_reader_single_column_by_name() {
1775        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1776
1777        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1778        let original_schema = Arc::clone(builder.schema());
1779
1780        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1781        let reader = builder.with_projection(mask).build().unwrap();
1782
1783        // Verify that the schema was correctly parsed
1784        assert_eq!(1, reader.schema().fields().len());
1785        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1786    }
1787
1788    #[test]
1789    fn test_null_column_reader_test() {
1790        let mut file = tempfile::tempfile().unwrap();
1791
1792        let schema = "
1793            message message {
1794                OPTIONAL INT32 int32;
1795            }
1796        ";
1797        let schema = Arc::new(parse_message_type(schema).unwrap());
1798
1799        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1800        generate_single_column_file_with_data::<Int32Type>(
1801            &[vec![], vec![]],
1802            Some(&def_levels),
1803            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1804            schema,
1805            Some(Field::new("int32", ArrowDataType::Null, true)),
1806            &Default::default(),
1807        )
1808        .unwrap();
1809
1810        file.rewind().unwrap();
1811
1812        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1813        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1814
1815        assert_eq!(batches.len(), 4);
1816        for batch in &batches[0..3] {
1817            assert_eq!(batch.num_rows(), 2);
1818            assert_eq!(batch.num_columns(), 1);
1819            assert_eq!(batch.column(0).null_count(), 2);
1820        }
1821
1822        assert_eq!(batches[3].num_rows(), 1);
1823        assert_eq!(batches[3].num_columns(), 1);
1824        assert_eq!(batches[3].column(0).null_count(), 1);
1825    }
1826
1827    #[test]
1828    fn test_primitive_single_column_reader_test() {
1829        run_single_column_reader_tests::<BoolType, _, BoolType>(
1830            2,
1831            ConvertedType::NONE,
1832            None,
1833            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1834            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1835        );
1836        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1837            2,
1838            ConvertedType::NONE,
1839            None,
1840            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1841            &[
1842                Encoding::PLAIN,
1843                Encoding::RLE_DICTIONARY,
1844                Encoding::DELTA_BINARY_PACKED,
1845                Encoding::BYTE_STREAM_SPLIT,
1846            ],
1847        );
1848        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1849            2,
1850            ConvertedType::NONE,
1851            None,
1852            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1853            &[
1854                Encoding::PLAIN,
1855                Encoding::RLE_DICTIONARY,
1856                Encoding::DELTA_BINARY_PACKED,
1857                Encoding::BYTE_STREAM_SPLIT,
1858            ],
1859        );
1860        run_single_column_reader_tests::<FloatType, _, FloatType>(
1861            2,
1862            ConvertedType::NONE,
1863            None,
1864            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1865            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1866        );
1867    }
1868
1869    #[test]
1870    fn test_unsigned_primitive_single_column_reader_test() {
1871        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1872            2,
1873            ConvertedType::UINT_32,
1874            Some(ArrowDataType::UInt32),
1875            |vals| {
1876                Arc::new(UInt32Array::from_iter(
1877                    vals.iter().map(|x| x.map(|x| x as u32)),
1878                ))
1879            },
1880            &[
1881                Encoding::PLAIN,
1882                Encoding::RLE_DICTIONARY,
1883                Encoding::DELTA_BINARY_PACKED,
1884            ],
1885        );
1886        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1887            2,
1888            ConvertedType::UINT_64,
1889            Some(ArrowDataType::UInt64),
1890            |vals| {
1891                Arc::new(UInt64Array::from_iter(
1892                    vals.iter().map(|x| x.map(|x| x as u64)),
1893                ))
1894            },
1895            &[
1896                Encoding::PLAIN,
1897                Encoding::RLE_DICTIONARY,
1898                Encoding::DELTA_BINARY_PACKED,
1899            ],
1900        );
1901    }
1902
1903    #[test]
1904    fn test_unsigned_roundtrip() {
1905        let schema = Arc::new(Schema::new(vec![
1906            Field::new("uint32", ArrowDataType::UInt32, true),
1907            Field::new("uint64", ArrowDataType::UInt64, true),
1908        ]));
1909
1910        let mut buf = Vec::with_capacity(1024);
1911        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1912
1913        let original = RecordBatch::try_new(
1914            schema,
1915            vec![
1916                Arc::new(UInt32Array::from_iter_values([
1917                    0,
1918                    i32::MAX as u32,
1919                    u32::MAX,
1920                ])),
1921                Arc::new(UInt64Array::from_iter_values([
1922                    0,
1923                    i64::MAX as u64,
1924                    u64::MAX,
1925                ])),
1926            ],
1927        )
1928        .unwrap();
1929
1930        writer.write(&original).unwrap();
1931        writer.close().unwrap();
1932
1933        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1934        let ret = reader.next().unwrap().unwrap();
1935        assert_eq!(ret, original);
1936
1937        // Check they can be downcast to the correct type
1938        ret.column(0)
1939            .as_any()
1940            .downcast_ref::<UInt32Array>()
1941            .unwrap();
1942
1943        ret.column(1)
1944            .as_any()
1945            .downcast_ref::<UInt64Array>()
1946            .unwrap();
1947    }
1948
1949    #[test]
1950    fn test_float16_roundtrip() -> Result<()> {
1951        let schema = Arc::new(Schema::new(vec![
1952            Field::new("float16", ArrowDataType::Float16, false),
1953            Field::new("float16-nullable", ArrowDataType::Float16, true),
1954        ]));
1955
1956        let mut buf = Vec::with_capacity(1024);
1957        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1958
1959        let original = RecordBatch::try_new(
1960            schema,
1961            vec![
1962                Arc::new(Float16Array::from_iter_values([
1963                    f16::EPSILON,
1964                    f16::MIN,
1965                    f16::MAX,
1966                    f16::NAN,
1967                    f16::INFINITY,
1968                    f16::NEG_INFINITY,
1969                    f16::ONE,
1970                    f16::NEG_ONE,
1971                    f16::ZERO,
1972                    f16::NEG_ZERO,
1973                    f16::E,
1974                    f16::PI,
1975                    f16::FRAC_1_PI,
1976                ])),
1977                Arc::new(Float16Array::from(vec![
1978                    None,
1979                    None,
1980                    None,
1981                    Some(f16::NAN),
1982                    Some(f16::INFINITY),
1983                    Some(f16::NEG_INFINITY),
1984                    None,
1985                    None,
1986                    None,
1987                    None,
1988                    None,
1989                    None,
1990                    Some(f16::FRAC_1_PI),
1991                ])),
1992            ],
1993        )?;
1994
1995        writer.write(&original)?;
1996        writer.close()?;
1997
1998        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1999        let ret = reader.next().unwrap()?;
2000        assert_eq!(ret, original);
2001
2002        // Ensure can be downcast to the correct type
2003        ret.column(0).as_primitive::<Float16Type>();
2004        ret.column(1).as_primitive::<Float16Type>();
2005
2006        Ok(())
2007    }
2008
2009    #[test]
2010    fn test_time_utc_roundtrip() -> Result<()> {
2011        let schema = Arc::new(Schema::new(vec![
2012            Field::new(
2013                "time_millis",
2014                ArrowDataType::Time32(TimeUnit::Millisecond),
2015                true,
2016            )
2017            .with_metadata(HashMap::from_iter(vec![(
2018                "adjusted_to_utc".to_string(),
2019                "".to_string(),
2020            )])),
2021            Field::new(
2022                "time_micros",
2023                ArrowDataType::Time64(TimeUnit::Microsecond),
2024                true,
2025            )
2026            .with_metadata(HashMap::from_iter(vec![(
2027                "adjusted_to_utc".to_string(),
2028                "".to_string(),
2029            )])),
2030        ]));
2031
2032        let mut buf = Vec::with_capacity(1024);
2033        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2034
2035        let original = RecordBatch::try_new(
2036            schema,
2037            vec![
2038                Arc::new(Time32MillisecondArray::from(vec![
2039                    Some(-1),
2040                    Some(0),
2041                    Some(86_399_000),
2042                    Some(86_400_000),
2043                    Some(86_401_000),
2044                    None,
2045                ])),
2046                Arc::new(Time64MicrosecondArray::from(vec![
2047                    Some(-1),
2048                    Some(0),
2049                    Some(86_399 * 1_000_000),
2050                    Some(86_400 * 1_000_000),
2051                    Some(86_401 * 1_000_000),
2052                    None,
2053                ])),
2054            ],
2055        )?;
2056
2057        writer.write(&original)?;
2058        writer.close()?;
2059
2060        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2061        let ret = reader.next().unwrap()?;
2062        assert_eq!(ret, original);
2063
2064        // Ensure can be downcast to the correct type
2065        ret.column(0).as_primitive::<Time32MillisecondType>();
2066        ret.column(1).as_primitive::<Time64MicrosecondType>();
2067
2068        Ok(())
2069    }
2070
2071    #[test]
2072    fn test_date32_roundtrip() -> Result<()> {
2073        use arrow_array::Date32Array;
2074
2075        let schema = Arc::new(Schema::new(vec![Field::new(
2076            "date32",
2077            ArrowDataType::Date32,
2078            false,
2079        )]));
2080
2081        let mut buf = Vec::with_capacity(1024);
2082
2083        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2084
2085        let original = RecordBatch::try_new(
2086            schema,
2087            vec![Arc::new(Date32Array::from(vec![
2088                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2089            ]))],
2090        )?;
2091
2092        writer.write(&original)?;
2093        writer.close()?;
2094
2095        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2096        let ret = reader.next().unwrap()?;
2097        assert_eq!(ret, original);
2098
2099        // Ensure can be downcast to the correct type
2100        ret.column(0).as_primitive::<Date32Type>();
2101
2102        Ok(())
2103    }
2104
2105    #[test]
2106    fn test_date64_roundtrip() -> Result<()> {
2107        use arrow_array::Date64Array;
2108
2109        let schema = Arc::new(Schema::new(vec![
2110            Field::new("small-date64", ArrowDataType::Date64, false),
2111            Field::new("big-date64", ArrowDataType::Date64, false),
2112            Field::new("invalid-date64", ArrowDataType::Date64, false),
2113        ]));
2114
2115        let mut default_buf = Vec::with_capacity(1024);
2116        let mut coerce_buf = Vec::with_capacity(1024);
2117
2118        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2119
2120        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2121        let mut coerce_writer =
2122            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2123
2124        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2125
2126        let original = RecordBatch::try_new(
2127            schema,
2128            vec![
2129                // small-date64
2130                Arc::new(Date64Array::from(vec![
2131                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2132                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2133                    0,
2134                    1_000 * NUM_MILLISECONDS_IN_DAY,
2135                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2136                ])),
2137                // big-date64
2138                Arc::new(Date64Array::from(vec![
2139                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2140                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2141                    0,
2142                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2143                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2144                ])),
2145                // invalid-date64
2146                Arc::new(Date64Array::from(vec![
2147                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2148                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2149                    1,
2150                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2151                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2152                ])),
2153            ],
2154        )?;
2155
2156        default_writer.write(&original)?;
2157        coerce_writer.write(&original)?;
2158
2159        default_writer.close()?;
2160        coerce_writer.close()?;
2161
2162        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2163        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2164
2165        let default_ret = default_reader.next().unwrap()?;
2166        let coerce_ret = coerce_reader.next().unwrap()?;
2167
2168        // Roundtrip should be successful when default writer used
2169        assert_eq!(default_ret, original);
2170
2171        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2172        assert_eq!(coerce_ret.column(0), original.column(0));
2173        assert_ne!(coerce_ret.column(1), original.column(1));
2174        assert_ne!(coerce_ret.column(2), original.column(2));
2175
2176        // Ensure both can be downcast to the correct type
2177        default_ret.column(0).as_primitive::<Date64Type>();
2178        coerce_ret.column(0).as_primitive::<Date64Type>();
2179
2180        Ok(())
2181    }
2182    struct RandFixedLenGen {}
2183
2184    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2185        fn r#gen(len: i32) -> FixedLenByteArray {
2186            let mut v = vec![0u8; len as usize];
2187            rng().fill_bytes(&mut v);
2188            ByteArray::from(v).into()
2189        }
2190    }
2191
2192    #[test]
2193    fn test_fixed_length_binary_column_reader() {
2194        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2195            20,
2196            ConvertedType::NONE,
2197            None,
2198            |vals| {
2199                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2200                for val in vals {
2201                    match val {
2202                        Some(b) => builder.append_value(b).unwrap(),
2203                        None => builder.append_null(),
2204                    }
2205                }
2206                Arc::new(builder.finish())
2207            },
2208            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2209        );
2210    }
2211
2212    #[test]
2213    fn test_interval_day_time_column_reader() {
2214        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2215            12,
2216            ConvertedType::INTERVAL,
2217            None,
2218            |vals| {
2219                Arc::new(
2220                    vals.iter()
2221                        .map(|x| {
2222                            x.as_ref().map(|b| IntervalDayTime {
2223                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2224                                milliseconds: i32::from_le_bytes(
2225                                    b.as_ref()[8..12].try_into().unwrap(),
2226                                ),
2227                            })
2228                        })
2229                        .collect::<IntervalDayTimeArray>(),
2230                )
2231            },
2232            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2233        );
2234    }
2235
2236    #[test]
2237    fn test_int96_single_column_reader_test() {
2238        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2239
2240        type TypeHintAndConversionFunction =
2241            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2242
2243        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2244            // Test without a specified ArrowType hint.
2245            (None, |vals: &[Option<Int96>]| {
2246                Arc::new(TimestampNanosecondArray::from_iter(
2247                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2248                )) as ArrayRef
2249            }),
2250            // Test other TimeUnits as ArrowType hints.
2251            (
2252                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2253                |vals: &[Option<Int96>]| {
2254                    Arc::new(TimestampSecondArray::from_iter(
2255                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2256                    )) as ArrayRef
2257                },
2258            ),
2259            (
2260                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2261                |vals: &[Option<Int96>]| {
2262                    Arc::new(TimestampMillisecondArray::from_iter(
2263                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2264                    )) as ArrayRef
2265                },
2266            ),
2267            (
2268                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2269                |vals: &[Option<Int96>]| {
2270                    Arc::new(TimestampMicrosecondArray::from_iter(
2271                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2272                    )) as ArrayRef
2273                },
2274            ),
2275            (
2276                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2277                |vals: &[Option<Int96>]| {
2278                    Arc::new(TimestampNanosecondArray::from_iter(
2279                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2280                    )) as ArrayRef
2281                },
2282            ),
2283            // Test another timezone with TimeUnit as ArrowType hints.
2284            (
2285                Some(ArrowDataType::Timestamp(
2286                    TimeUnit::Second,
2287                    Some(Arc::from("-05:00")),
2288                )),
2289                |vals: &[Option<Int96>]| {
2290                    Arc::new(
2291                        TimestampSecondArray::from_iter(
2292                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2293                        )
2294                        .with_timezone("-05:00"),
2295                    ) as ArrayRef
2296                },
2297            ),
2298        ];
2299
2300        resolutions.iter().for_each(|(arrow_type, converter)| {
2301            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2302                2,
2303                ConvertedType::NONE,
2304                arrow_type.clone(),
2305                converter,
2306                encodings,
2307            );
2308        })
2309    }
2310
2311    #[test]
2312    fn test_int96_from_spark_file_with_provided_schema() {
2313        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2314        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2315        // microsecond resolution for the Parquet reader to interpret these values correctly.
2316        use arrow_schema::DataType::Timestamp;
2317        let test_data = arrow::util::test_util::parquet_test_data();
2318        let path = format!("{test_data}/int96_from_spark.parquet");
2319        let file = File::open(path).unwrap();
2320
2321        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2322            "a",
2323            Timestamp(TimeUnit::Microsecond, None),
2324            true,
2325        )]));
2326        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2327
2328        let mut record_reader =
2329            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2330                .unwrap()
2331                .build()
2332                .unwrap();
2333
2334        let batch = record_reader.next().unwrap().unwrap();
2335        assert_eq!(batch.num_columns(), 1);
2336        let column = batch.column(0);
2337        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2338
2339        let expected = Arc::new(Int64Array::from(vec![
2340            Some(1704141296123456),
2341            Some(1704070800000000),
2342            Some(253402225200000000),
2343            Some(1735599600000000),
2344            None,
2345            Some(9089380393200000000),
2346        ]));
2347
2348        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2349        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2350        // anyway, so this should be a zero-copy non-modifying cast.
2351
2352        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2353        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2354
2355        assert_eq!(casted_timestamps.len(), expected.len());
2356
2357        casted_timestamps
2358            .iter()
2359            .zip(expected.iter())
2360            .for_each(|(lhs, rhs)| {
2361                assert_eq!(lhs, rhs);
2362            });
2363    }
2364
2365    #[test]
2366    fn test_int96_from_spark_file_without_provided_schema() {
2367        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2368        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2369        // values when read as nanosecond resolution overflow and result in garbage values.
2370        use arrow_schema::DataType::Timestamp;
2371        let test_data = arrow::util::test_util::parquet_test_data();
2372        let path = format!("{test_data}/int96_from_spark.parquet");
2373        let file = File::open(path).unwrap();
2374
2375        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2376            .unwrap()
2377            .build()
2378            .unwrap();
2379
2380        let batch = record_reader.next().unwrap().unwrap();
2381        assert_eq!(batch.num_columns(), 1);
2382        let column = batch.column(0);
2383        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2384
2385        let expected = Arc::new(Int64Array::from(vec![
2386            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2387            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2388            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2389            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2390            None,
2391            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2392        ]));
2393
2394        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2395        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2396        // anyway, so this should be a zero-copy non-modifying cast.
2397
2398        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2399        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2400
2401        assert_eq!(casted_timestamps.len(), expected.len());
2402
2403        casted_timestamps
2404            .iter()
2405            .zip(expected.iter())
2406            .for_each(|(lhs, rhs)| {
2407                assert_eq!(lhs, rhs);
2408            });
2409    }
2410
2411    struct RandUtf8Gen {}
2412
2413    impl RandGen<ByteArrayType> for RandUtf8Gen {
2414        fn r#gen(len: i32) -> ByteArray {
2415            Int32Type::r#gen(len).to_string().as_str().into()
2416        }
2417    }
2418
2419    #[test]
2420    fn test_utf8_single_column_reader_test() {
2421        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2422            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2423                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2424            })))
2425        }
2426
2427        let encodings = &[
2428            Encoding::PLAIN,
2429            Encoding::RLE_DICTIONARY,
2430            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2431            Encoding::DELTA_BYTE_ARRAY,
2432        ];
2433
2434        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2435            2,
2436            ConvertedType::NONE,
2437            None,
2438            |vals| {
2439                Arc::new(BinaryArray::from_iter(
2440                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2441                ))
2442            },
2443            encodings,
2444        );
2445
2446        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2447            2,
2448            ConvertedType::UTF8,
2449            None,
2450            string_converter::<i32>,
2451            encodings,
2452        );
2453
2454        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2455            2,
2456            ConvertedType::UTF8,
2457            Some(ArrowDataType::Utf8),
2458            string_converter::<i32>,
2459            encodings,
2460        );
2461
2462        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2463            2,
2464            ConvertedType::UTF8,
2465            Some(ArrowDataType::LargeUtf8),
2466            string_converter::<i64>,
2467            encodings,
2468        );
2469
2470        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2471        for key in &small_key_types {
2472            for encoding in encodings {
2473                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2474                opts.encoding = *encoding;
2475
2476                let data_type =
2477                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2478
2479                // Cannot run full test suite as keys overflow, run small test instead
2480                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2481                    opts,
2482                    2,
2483                    ConvertedType::UTF8,
2484                    Some(data_type.clone()),
2485                    move |vals| {
2486                        let vals = string_converter::<i32>(vals);
2487                        arrow::compute::cast(&vals, &data_type).unwrap()
2488                    },
2489                );
2490            }
2491        }
2492
2493        let key_types = [
2494            ArrowDataType::Int16,
2495            ArrowDataType::UInt16,
2496            ArrowDataType::Int32,
2497            ArrowDataType::UInt32,
2498            ArrowDataType::Int64,
2499            ArrowDataType::UInt64,
2500        ];
2501
2502        for key in &key_types {
2503            let data_type =
2504                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2505
2506            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2507                2,
2508                ConvertedType::UTF8,
2509                Some(data_type.clone()),
2510                move |vals| {
2511                    let vals = string_converter::<i32>(vals);
2512                    arrow::compute::cast(&vals, &data_type).unwrap()
2513                },
2514                encodings,
2515            );
2516
2517            let data_type = ArrowDataType::Dictionary(
2518                Box::new(key.clone()),
2519                Box::new(ArrowDataType::LargeUtf8),
2520            );
2521
2522            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2523                2,
2524                ConvertedType::UTF8,
2525                Some(data_type.clone()),
2526                move |vals| {
2527                    let vals = string_converter::<i64>(vals);
2528                    arrow::compute::cast(&vals, &data_type).unwrap()
2529                },
2530                encodings,
2531            );
2532        }
2533    }
2534
2535    #[test]
2536    fn test_decimal_nullable_struct() {
2537        let decimals = Decimal256Array::from_iter_values(
2538            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2539        );
2540
2541        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2542            "decimals",
2543            decimals.data_type().clone(),
2544            false,
2545        )])))
2546        .len(8)
2547        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2548        .child_data(vec![decimals.into_data()])
2549        .build()
2550        .unwrap();
2551
2552        let written =
2553            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2554                .unwrap();
2555
2556        let mut buffer = Vec::with_capacity(1024);
2557        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2558        writer.write(&written).unwrap();
2559        writer.close().unwrap();
2560
2561        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2562            .unwrap()
2563            .collect::<Result<Vec<_>, _>>()
2564            .unwrap();
2565
2566        assert_eq!(&written.slice(0, 3), &read[0]);
2567        assert_eq!(&written.slice(3, 3), &read[1]);
2568        assert_eq!(&written.slice(6, 2), &read[2]);
2569    }
2570
2571    #[test]
2572    fn test_int32_nullable_struct() {
2573        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2574        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2575            "int32",
2576            int32.data_type().clone(),
2577            false,
2578        )])))
2579        .len(8)
2580        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2581        .child_data(vec![int32.into_data()])
2582        .build()
2583        .unwrap();
2584
2585        let written =
2586            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2587                .unwrap();
2588
2589        let mut buffer = Vec::with_capacity(1024);
2590        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2591        writer.write(&written).unwrap();
2592        writer.close().unwrap();
2593
2594        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2595            .unwrap()
2596            .collect::<Result<Vec<_>, _>>()
2597            .unwrap();
2598
2599        assert_eq!(&written.slice(0, 3), &read[0]);
2600        assert_eq!(&written.slice(3, 3), &read[1]);
2601        assert_eq!(&written.slice(6, 2), &read[2]);
2602    }
2603
2604    #[test]
2605    fn test_decimal_list() {
2606        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2607
2608        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2609        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2610            decimals.data_type().clone(),
2611            false,
2612        ))))
2613        .len(7)
2614        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2615        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2616        .child_data(vec![decimals.into_data()])
2617        .build()
2618        .unwrap();
2619
2620        let written =
2621            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2622                .unwrap();
2623
2624        let mut buffer = Vec::with_capacity(1024);
2625        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2626        writer.write(&written).unwrap();
2627        writer.close().unwrap();
2628
2629        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2630            .unwrap()
2631            .collect::<Result<Vec<_>, _>>()
2632            .unwrap();
2633
2634        assert_eq!(&written.slice(0, 3), &read[0]);
2635        assert_eq!(&written.slice(3, 3), &read[1]);
2636        assert_eq!(&written.slice(6, 1), &read[2]);
2637    }
2638
2639    #[test]
2640    fn test_read_decimal_file() {
2641        use arrow_array::Decimal128Array;
2642        let testdata = arrow::util::test_util::parquet_test_data();
2643        let file_variants = vec![
2644            ("byte_array", 4),
2645            ("fixed_length", 25),
2646            ("int32", 4),
2647            ("int64", 10),
2648        ];
2649        for (prefix, target_precision) in file_variants {
2650            let path = format!("{testdata}/{prefix}_decimal.parquet");
2651            let file = File::open(path).unwrap();
2652            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2653
2654            let batch = record_reader.next().unwrap().unwrap();
2655            assert_eq!(batch.num_rows(), 24);
2656            let col = batch
2657                .column(0)
2658                .as_any()
2659                .downcast_ref::<Decimal128Array>()
2660                .unwrap();
2661
2662            let expected = 1..25;
2663
2664            assert_eq!(col.precision(), target_precision);
2665            assert_eq!(col.scale(), 2);
2666
2667            for (i, v) in expected.enumerate() {
2668                assert_eq!(col.value(i), v * 100_i128);
2669            }
2670        }
2671    }
2672
2673    #[test]
2674    fn test_read_float16_nonzeros_file() {
2675        use arrow_array::Float16Array;
2676        let testdata = arrow::util::test_util::parquet_test_data();
2677        // see https://github.com/apache/parquet-testing/pull/40
2678        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2679        let file = File::open(path).unwrap();
2680        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2681
2682        let batch = record_reader.next().unwrap().unwrap();
2683        assert_eq!(batch.num_rows(), 8);
2684        let col = batch
2685            .column(0)
2686            .as_any()
2687            .downcast_ref::<Float16Array>()
2688            .unwrap();
2689
2690        let f16_two = f16::ONE + f16::ONE;
2691
2692        assert_eq!(col.null_count(), 1);
2693        assert!(col.is_null(0));
2694        assert_eq!(col.value(1), f16::ONE);
2695        assert_eq!(col.value(2), -f16_two);
2696        assert!(col.value(3).is_nan());
2697        assert_eq!(col.value(4), f16::ZERO);
2698        assert!(col.value(4).is_sign_positive());
2699        assert_eq!(col.value(5), f16::NEG_ONE);
2700        assert_eq!(col.value(6), f16::NEG_ZERO);
2701        assert!(col.value(6).is_sign_negative());
2702        assert_eq!(col.value(7), f16_two);
2703    }
2704
2705    #[test]
2706    fn test_read_float16_zeros_file() {
2707        use arrow_array::Float16Array;
2708        let testdata = arrow::util::test_util::parquet_test_data();
2709        // see https://github.com/apache/parquet-testing/pull/40
2710        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2711        let file = File::open(path).unwrap();
2712        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2713
2714        let batch = record_reader.next().unwrap().unwrap();
2715        assert_eq!(batch.num_rows(), 3);
2716        let col = batch
2717            .column(0)
2718            .as_any()
2719            .downcast_ref::<Float16Array>()
2720            .unwrap();
2721
2722        assert_eq!(col.null_count(), 1);
2723        assert!(col.is_null(0));
2724        assert_eq!(col.value(1), f16::ZERO);
2725        assert!(col.value(1).is_sign_positive());
2726        assert!(col.value(2).is_nan());
2727    }
2728
2729    #[test]
2730    fn test_read_float32_float64_byte_stream_split() {
2731        let path = format!(
2732            "{}/byte_stream_split.zstd.parquet",
2733            arrow::util::test_util::parquet_test_data(),
2734        );
2735        let file = File::open(path).unwrap();
2736        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2737
2738        let mut row_count = 0;
2739        for batch in record_reader {
2740            let batch = batch.unwrap();
2741            row_count += batch.num_rows();
2742            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2743            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2744
2745            // This file contains floats from a standard normal distribution
2746            for &x in f32_col.values() {
2747                assert!(x > -10.0);
2748                assert!(x < 10.0);
2749            }
2750            for &x in f64_col.values() {
2751                assert!(x > -10.0);
2752                assert!(x < 10.0);
2753            }
2754        }
2755        assert_eq!(row_count, 300);
2756    }
2757
2758    #[test]
2759    fn test_read_extended_byte_stream_split() {
2760        let path = format!(
2761            "{}/byte_stream_split_extended.gzip.parquet",
2762            arrow::util::test_util::parquet_test_data(),
2763        );
2764        let file = File::open(path).unwrap();
2765        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2766
2767        let mut row_count = 0;
2768        for batch in record_reader {
2769            let batch = batch.unwrap();
2770            row_count += batch.num_rows();
2771
2772            // 0,1 are f16
2773            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2774            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2775            assert_eq!(f16_col.len(), f16_bss.len());
2776            f16_col
2777                .iter()
2778                .zip(f16_bss.iter())
2779                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2780
2781            // 2,3 are f32
2782            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2783            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2784            assert_eq!(f32_col.len(), f32_bss.len());
2785            f32_col
2786                .iter()
2787                .zip(f32_bss.iter())
2788                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2789
2790            // 4,5 are f64
2791            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2792            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2793            assert_eq!(f64_col.len(), f64_bss.len());
2794            f64_col
2795                .iter()
2796                .zip(f64_bss.iter())
2797                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2798
2799            // 6,7 are i32
2800            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2801            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2802            assert_eq!(i32_col.len(), i32_bss.len());
2803            i32_col
2804                .iter()
2805                .zip(i32_bss.iter())
2806                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2807
2808            // 8,9 are i64
2809            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2810            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2811            assert_eq!(i64_col.len(), i64_bss.len());
2812            i64_col
2813                .iter()
2814                .zip(i64_bss.iter())
2815                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2816
2817            // 10,11 are FLBA(5)
2818            let flba_col = batch.column(10).as_fixed_size_binary();
2819            let flba_bss = batch.column(11).as_fixed_size_binary();
2820            assert_eq!(flba_col.len(), flba_bss.len());
2821            flba_col
2822                .iter()
2823                .zip(flba_bss.iter())
2824                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2825
2826            // 12,13 are FLBA(4) (decimal(7,3))
2827            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2828            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2829            assert_eq!(dec_col.len(), dec_bss.len());
2830            dec_col
2831                .iter()
2832                .zip(dec_bss.iter())
2833                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2834        }
2835        assert_eq!(row_count, 200);
2836    }
2837
2838    #[test]
2839    fn test_read_incorrect_map_schema_file() {
2840        let testdata = arrow::util::test_util::parquet_test_data();
2841        // see https://github.com/apache/parquet-testing/pull/47
2842        let path = format!("{testdata}/incorrect_map_schema.parquet");
2843        let file = File::open(path).unwrap();
2844        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2845
2846        let batch = record_reader.next().unwrap().unwrap();
2847        assert_eq!(batch.num_rows(), 1);
2848
2849        let expected_schema = Schema::new(vec![Field::new(
2850            "my_map",
2851            ArrowDataType::Map(
2852                Arc::new(Field::new(
2853                    "key_value",
2854                    ArrowDataType::Struct(Fields::from(vec![
2855                        Field::new("key", ArrowDataType::Utf8, false),
2856                        Field::new("value", ArrowDataType::Utf8, true),
2857                    ])),
2858                    false,
2859                )),
2860                false,
2861            ),
2862            true,
2863        )]);
2864        assert_eq!(batch.schema().as_ref(), &expected_schema);
2865
2866        assert_eq!(batch.num_rows(), 1);
2867        assert_eq!(batch.column(0).null_count(), 0);
2868        assert_eq!(
2869            batch.column(0).as_map().keys().as_ref(),
2870            &StringArray::from(vec!["parent", "name"])
2871        );
2872        assert_eq!(
2873            batch.column(0).as_map().values().as_ref(),
2874            &StringArray::from(vec!["another", "report"])
2875        );
2876    }
2877
2878    #[test]
2879    fn test_read_dict_fixed_size_binary() {
2880        let schema = Arc::new(Schema::new(vec![Field::new(
2881            "a",
2882            ArrowDataType::Dictionary(
2883                Box::new(ArrowDataType::UInt8),
2884                Box::new(ArrowDataType::FixedSizeBinary(8)),
2885            ),
2886            true,
2887        )]));
2888        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2889        let values = FixedSizeBinaryArray::try_from_iter(
2890            vec![
2891                (0u8..8u8).collect::<Vec<u8>>(),
2892                (24u8..32u8).collect::<Vec<u8>>(),
2893            ]
2894            .into_iter(),
2895        )
2896        .unwrap();
2897        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2898        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2899
2900        let mut buffer = Vec::with_capacity(1024);
2901        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2902        writer.write(&batch).unwrap();
2903        writer.close().unwrap();
2904        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2905            .unwrap()
2906            .collect::<Result<Vec<_>, _>>()
2907            .unwrap();
2908
2909        assert_eq!(read.len(), 1);
2910        assert_eq!(&batch, &read[0])
2911    }
2912
2913    #[test]
2914    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2915        // the `StructArrayReader` will check the definition and repetition levels of the first
2916        // child column in the struct to determine nullability for the struct. If the first
2917        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2918        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2919        // buffers managed by this array reader.
2920
2921        let struct_fields = Fields::from(vec![
2922            Field::new(
2923                "city",
2924                ArrowDataType::Dictionary(
2925                    Box::new(ArrowDataType::UInt8),
2926                    Box::new(ArrowDataType::Utf8),
2927                ),
2928                true,
2929            ),
2930            Field::new("name", ArrowDataType::Utf8, true),
2931        ]);
2932        let schema = Arc::new(Schema::new(vec![Field::new(
2933            "items",
2934            ArrowDataType::Struct(struct_fields.clone()),
2935            true,
2936        )]));
2937
2938        let items_arr = StructArray::new(
2939            struct_fields,
2940            vec![
2941                Arc::new(DictionaryArray::new(
2942                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2943                    Arc::new(StringArray::from_iter_values(vec![
2944                        "quebec",
2945                        "fredericton",
2946                        "halifax",
2947                    ])),
2948                )),
2949                Arc::new(StringArray::from_iter_values(vec![
2950                    "albert", "terry", "lance", "", "tim",
2951                ])),
2952            ],
2953            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2954        );
2955
2956        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2957        let mut buffer = Vec::with_capacity(1024);
2958        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2959        writer.write(&batch).unwrap();
2960        writer.close().unwrap();
2961        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2962            .unwrap()
2963            .collect::<Result<Vec<_>, _>>()
2964            .unwrap();
2965
2966        assert_eq!(read.len(), 1);
2967        assert_eq!(&batch, &read[0])
2968    }
2969
2970    /// Parameters for single_column_reader_test
2971    #[derive(Clone)]
2972    struct TestOptions {
2973        /// Number of row group to write to parquet (row group size =
2974        /// num_row_groups / num_rows)
2975        num_row_groups: usize,
2976        /// Total number of rows per row group
2977        num_rows: usize,
2978        /// Size of batches to read back
2979        record_batch_size: usize,
2980        /// Percentage of nulls in column or None if required
2981        null_percent: Option<usize>,
2982        /// Set write batch size
2983        ///
2984        /// This is the number of rows that are written at once to a page and
2985        /// therefore acts as a bound on the page granularity of a row group
2986        write_batch_size: usize,
2987        /// Maximum size of page in bytes
2988        max_data_page_size: usize,
2989        /// Maximum size of dictionary page in bytes
2990        max_dict_page_size: usize,
2991        /// Writer version
2992        writer_version: WriterVersion,
2993        /// Enabled statistics
2994        enabled_statistics: EnabledStatistics,
2995        /// Encoding
2996        encoding: Encoding,
2997        /// row selections and total selected row count
2998        row_selections: Option<(RowSelection, usize)>,
2999        /// row filter
3000        row_filter: Option<Vec<bool>>,
3001        /// limit
3002        limit: Option<usize>,
3003        /// offset
3004        offset: Option<usize>,
3005    }
3006
3007    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3008    impl std::fmt::Debug for TestOptions {
3009        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3010            f.debug_struct("TestOptions")
3011                .field("num_row_groups", &self.num_row_groups)
3012                .field("num_rows", &self.num_rows)
3013                .field("record_batch_size", &self.record_batch_size)
3014                .field("null_percent", &self.null_percent)
3015                .field("write_batch_size", &self.write_batch_size)
3016                .field("max_data_page_size", &self.max_data_page_size)
3017                .field("max_dict_page_size", &self.max_dict_page_size)
3018                .field("writer_version", &self.writer_version)
3019                .field("enabled_statistics", &self.enabled_statistics)
3020                .field("encoding", &self.encoding)
3021                .field("row_selections", &self.row_selections.is_some())
3022                .field("row_filter", &self.row_filter.is_some())
3023                .field("limit", &self.limit)
3024                .field("offset", &self.offset)
3025                .finish()
3026        }
3027    }
3028
3029    impl Default for TestOptions {
3030        fn default() -> Self {
3031            Self {
3032                num_row_groups: 2,
3033                num_rows: 100,
3034                record_batch_size: 15,
3035                null_percent: None,
3036                write_batch_size: 64,
3037                max_data_page_size: 1024 * 1024,
3038                max_dict_page_size: 1024 * 1024,
3039                writer_version: WriterVersion::PARQUET_1_0,
3040                enabled_statistics: EnabledStatistics::Page,
3041                encoding: Encoding::PLAIN,
3042                row_selections: None,
3043                row_filter: None,
3044                limit: None,
3045                offset: None,
3046            }
3047        }
3048    }
3049
3050    impl TestOptions {
3051        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3052            Self {
3053                num_row_groups,
3054                num_rows,
3055                record_batch_size,
3056                ..Default::default()
3057            }
3058        }
3059
3060        fn with_null_percent(self, null_percent: usize) -> Self {
3061            Self {
3062                null_percent: Some(null_percent),
3063                ..self
3064            }
3065        }
3066
3067        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3068            Self {
3069                max_data_page_size,
3070                ..self
3071            }
3072        }
3073
3074        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3075            Self {
3076                max_dict_page_size,
3077                ..self
3078            }
3079        }
3080
3081        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3082            Self {
3083                enabled_statistics,
3084                ..self
3085            }
3086        }
3087
3088        fn with_row_selections(self) -> Self {
3089            assert!(self.row_filter.is_none(), "Must set row selection first");
3090
3091            let mut rng = rng();
3092            let step = rng.random_range(self.record_batch_size..self.num_rows);
3093            let row_selections = create_test_selection(
3094                step,
3095                self.num_row_groups * self.num_rows,
3096                rng.random::<bool>(),
3097            );
3098            Self {
3099                row_selections: Some(row_selections),
3100                ..self
3101            }
3102        }
3103
3104        fn with_row_filter(self) -> Self {
3105            let row_count = match &self.row_selections {
3106                Some((_, count)) => *count,
3107                None => self.num_row_groups * self.num_rows,
3108            };
3109
3110            let mut rng = rng();
3111            Self {
3112                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3113                ..self
3114            }
3115        }
3116
3117        fn with_limit(self, limit: usize) -> Self {
3118            Self {
3119                limit: Some(limit),
3120                ..self
3121            }
3122        }
3123
3124        fn with_offset(self, offset: usize) -> Self {
3125            Self {
3126                offset: Some(offset),
3127                ..self
3128            }
3129        }
3130
3131        fn writer_props(&self) -> WriterProperties {
3132            let builder = WriterProperties::builder()
3133                .set_data_page_size_limit(self.max_data_page_size)
3134                .set_write_batch_size(self.write_batch_size)
3135                .set_writer_version(self.writer_version)
3136                .set_statistics_enabled(self.enabled_statistics);
3137
3138            let builder = match self.encoding {
3139                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3140                    .set_dictionary_enabled(true)
3141                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3142                _ => builder
3143                    .set_dictionary_enabled(false)
3144                    .set_encoding(self.encoding),
3145            };
3146
3147            builder.build()
3148        }
3149    }
3150
3151    /// Create a parquet file and then read it using
3152    /// `ParquetFileArrowReader` using a standard set of parameters
3153    /// `opts`.
3154    ///
3155    /// `rand_max` represents the maximum size of value to pass to to
3156    /// value generator
3157    fn run_single_column_reader_tests<T, F, G>(
3158        rand_max: i32,
3159        converted_type: ConvertedType,
3160        arrow_type: Option<ArrowDataType>,
3161        converter: F,
3162        encodings: &[Encoding],
3163    ) where
3164        T: DataType,
3165        G: RandGen<T>,
3166        F: Fn(&[Option<T::T>]) -> ArrayRef,
3167    {
3168        let all_options = vec![
3169            // choose record_batch_batch (15) so batches cross row
3170            // group boundaries (50 rows in 2 row groups) cases.
3171            TestOptions::new(2, 100, 15),
3172            // choose record_batch_batch (5) so batches sometime fall
3173            // on row group boundaries and (25 rows in 3 row groups
3174            // --> row groups of 10, 10, and 5). Tests buffer
3175            // refilling edge cases.
3176            TestOptions::new(3, 25, 5),
3177            // Choose record_batch_size (25) so all batches fall
3178            // exactly on row group boundary (25). Tests buffer
3179            // refilling edge cases.
3180            TestOptions::new(4, 100, 25),
3181            // Set maximum page size so row groups have multiple pages
3182            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3183            // Set small dictionary page size to test dictionary fallback
3184            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3185            // Test optional but with no nulls
3186            TestOptions::new(2, 256, 127).with_null_percent(0),
3187            // Test optional with nulls
3188            TestOptions::new(2, 256, 93).with_null_percent(25),
3189            // Test with limit of 0
3190            TestOptions::new(4, 100, 25).with_limit(0),
3191            // Test with limit of 50
3192            TestOptions::new(4, 100, 25).with_limit(50),
3193            // Test with limit equal to number of rows
3194            TestOptions::new(4, 100, 25).with_limit(10),
3195            // Test with limit larger than number of rows
3196            TestOptions::new(4, 100, 25).with_limit(101),
3197            // Test with limit + offset equal to number of rows
3198            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3199            // Test with limit + offset equal to number of rows
3200            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3201            // Test with limit + offset larger than number of rows
3202            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3203            // Test with no page-level statistics
3204            TestOptions::new(2, 256, 91)
3205                .with_null_percent(25)
3206                .with_enabled_statistics(EnabledStatistics::Chunk),
3207            // Test with no statistics
3208            TestOptions::new(2, 256, 91)
3209                .with_null_percent(25)
3210                .with_enabled_statistics(EnabledStatistics::None),
3211            // Test with all null
3212            TestOptions::new(2, 128, 91)
3213                .with_null_percent(100)
3214                .with_enabled_statistics(EnabledStatistics::None),
3215            // Test skip
3216
3217            // choose record_batch_batch (15) so batches cross row
3218            // group boundaries (50 rows in 2 row groups) cases.
3219            TestOptions::new(2, 100, 15).with_row_selections(),
3220            // choose record_batch_batch (5) so batches sometime fall
3221            // on row group boundaries and (25 rows in 3 row groups
3222            // --> row groups of 10, 10, and 5). Tests buffer
3223            // refilling edge cases.
3224            TestOptions::new(3, 25, 5).with_row_selections(),
3225            // Choose record_batch_size (25) so all batches fall
3226            // exactly on row group boundary (25). Tests buffer
3227            // refilling edge cases.
3228            TestOptions::new(4, 100, 25).with_row_selections(),
3229            // Set maximum page size so row groups have multiple pages
3230            TestOptions::new(3, 256, 73)
3231                .with_max_data_page_size(128)
3232                .with_row_selections(),
3233            // Set small dictionary page size to test dictionary fallback
3234            TestOptions::new(3, 256, 57)
3235                .with_max_dict_page_size(128)
3236                .with_row_selections(),
3237            // Test optional but with no nulls
3238            TestOptions::new(2, 256, 127)
3239                .with_null_percent(0)
3240                .with_row_selections(),
3241            // Test optional with nulls
3242            TestOptions::new(2, 256, 93)
3243                .with_null_percent(25)
3244                .with_row_selections(),
3245            // Test optional with nulls
3246            TestOptions::new(2, 256, 93)
3247                .with_null_percent(25)
3248                .with_row_selections()
3249                .with_limit(10),
3250            // Test optional with nulls
3251            TestOptions::new(2, 256, 93)
3252                .with_null_percent(25)
3253                .with_row_selections()
3254                .with_offset(20)
3255                .with_limit(10),
3256            // Test filter
3257
3258            // Test with row filter
3259            TestOptions::new(4, 100, 25).with_row_filter(),
3260            // Test with row selection and row filter
3261            TestOptions::new(4, 100, 25)
3262                .with_row_selections()
3263                .with_row_filter(),
3264            // Test with nulls and row filter
3265            TestOptions::new(2, 256, 93)
3266                .with_null_percent(25)
3267                .with_max_data_page_size(10)
3268                .with_row_filter(),
3269            // Test with nulls and row filter and small pages
3270            TestOptions::new(2, 256, 93)
3271                .with_null_percent(25)
3272                .with_max_data_page_size(10)
3273                .with_row_selections()
3274                .with_row_filter(),
3275            // Test with row selection and no offset index and small pages
3276            TestOptions::new(2, 256, 93)
3277                .with_enabled_statistics(EnabledStatistics::None)
3278                .with_max_data_page_size(10)
3279                .with_row_selections(),
3280        ];
3281
3282        all_options.into_iter().for_each(|opts| {
3283            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3284                for encoding in encodings {
3285                    let opts = TestOptions {
3286                        writer_version,
3287                        encoding: *encoding,
3288                        ..opts.clone()
3289                    };
3290
3291                    single_column_reader_test::<T, _, G>(
3292                        opts,
3293                        rand_max,
3294                        converted_type,
3295                        arrow_type.clone(),
3296                        &converter,
3297                    )
3298                }
3299            }
3300        });
3301    }
3302
3303    /// Create a parquet file and then read it using
3304    /// `ParquetFileArrowReader` using the parameters described in
3305    /// `opts`.
3306    fn single_column_reader_test<T, F, G>(
3307        opts: TestOptions,
3308        rand_max: i32,
3309        converted_type: ConvertedType,
3310        arrow_type: Option<ArrowDataType>,
3311        converter: F,
3312    ) where
3313        T: DataType,
3314        G: RandGen<T>,
3315        F: Fn(&[Option<T::T>]) -> ArrayRef,
3316    {
3317        // Print out options to facilitate debugging failures on CI
3318        println!(
3319            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3320            T::get_physical_type(),
3321            converted_type,
3322            arrow_type,
3323            opts
3324        );
3325
3326        //according to null_percent generate def_levels
3327        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3328            Some(null_percent) => {
3329                let mut rng = rng();
3330
3331                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3332                    .map(|_| {
3333                        std::iter::from_fn(|| {
3334                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3335                        })
3336                        .take(opts.num_rows)
3337                        .collect()
3338                    })
3339                    .collect();
3340                (Repetition::OPTIONAL, Some(def_levels))
3341            }
3342            None => (Repetition::REQUIRED, None),
3343        };
3344
3345        //generate random table data
3346        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3347            .map(|idx| {
3348                let null_count = match def_levels.as_ref() {
3349                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3350                    None => 0,
3351                };
3352                G::gen_vec(rand_max, opts.num_rows - null_count)
3353            })
3354            .collect();
3355
3356        let len = match T::get_physical_type() {
3357            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3358            crate::basic::Type::INT96 => 12,
3359            _ => -1,
3360        };
3361
3362        let fields = vec![Arc::new(
3363            Type::primitive_type_builder("leaf", T::get_physical_type())
3364                .with_repetition(repetition)
3365                .with_converted_type(converted_type)
3366                .with_length(len)
3367                .build()
3368                .unwrap(),
3369        )];
3370
3371        let schema = Arc::new(
3372            Type::group_type_builder("test_schema")
3373                .with_fields(fields)
3374                .build()
3375                .unwrap(),
3376        );
3377
3378        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3379
3380        let mut file = tempfile::tempfile().unwrap();
3381
3382        generate_single_column_file_with_data::<T>(
3383            &values,
3384            def_levels.as_ref(),
3385            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3386            schema,
3387            arrow_field,
3388            &opts,
3389        )
3390        .unwrap();
3391
3392        file.rewind().unwrap();
3393
3394        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3395            opts.enabled_statistics == EnabledStatistics::Page,
3396        ));
3397
3398        let mut builder =
3399            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3400
3401        let expected_data = match opts.row_selections {
3402            Some((selections, row_count)) => {
3403                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3404
3405                let mut skip_data: Vec<Option<T::T>> = vec![];
3406                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3407                for select in dequeue {
3408                    if select.skip {
3409                        without_skip_data.drain(0..select.row_count);
3410                    } else {
3411                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3412                    }
3413                }
3414                builder = builder.with_row_selection(selections);
3415
3416                assert_eq!(skip_data.len(), row_count);
3417                skip_data
3418            }
3419            None => {
3420                //get flatten table data
3421                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3422                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3423                expected_data
3424            }
3425        };
3426
3427        let mut expected_data = match opts.row_filter {
3428            Some(filter) => {
3429                let expected_data = expected_data
3430                    .into_iter()
3431                    .zip(filter.iter())
3432                    .filter_map(|(d, f)| f.then(|| d))
3433                    .collect();
3434
3435                let mut filter_offset = 0;
3436                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3437                    ProjectionMask::all(),
3438                    move |b| {
3439                        let array = BooleanArray::from_iter(
3440                            filter
3441                                .iter()
3442                                .skip(filter_offset)
3443                                .take(b.num_rows())
3444                                .map(|x| Some(*x)),
3445                        );
3446                        filter_offset += b.num_rows();
3447                        Ok(array)
3448                    },
3449                ))]);
3450
3451                builder = builder.with_row_filter(filter);
3452                expected_data
3453            }
3454            None => expected_data,
3455        };
3456
3457        if let Some(offset) = opts.offset {
3458            builder = builder.with_offset(offset);
3459            expected_data = expected_data.into_iter().skip(offset).collect();
3460        }
3461
3462        if let Some(limit) = opts.limit {
3463            builder = builder.with_limit(limit);
3464            expected_data = expected_data.into_iter().take(limit).collect();
3465        }
3466
3467        let mut record_reader = builder
3468            .with_batch_size(opts.record_batch_size)
3469            .build()
3470            .unwrap();
3471
3472        let mut total_read = 0;
3473        loop {
3474            let maybe_batch = record_reader.next();
3475            if total_read < expected_data.len() {
3476                let end = min(total_read + opts.record_batch_size, expected_data.len());
3477                let batch = maybe_batch.unwrap().unwrap();
3478                assert_eq!(end - total_read, batch.num_rows());
3479
3480                let a = converter(&expected_data[total_read..end]);
3481                let b = batch.column(0);
3482
3483                assert_eq!(a.data_type(), b.data_type());
3484                assert_eq!(a.to_data(), b.to_data());
3485                assert_eq!(
3486                    a.as_any().type_id(),
3487                    b.as_any().type_id(),
3488                    "incorrect type ids"
3489                );
3490
3491                total_read = end;
3492            } else {
3493                assert!(maybe_batch.is_none());
3494                break;
3495            }
3496        }
3497    }
3498
3499    fn gen_expected_data<T: DataType>(
3500        def_levels: Option<&Vec<Vec<i16>>>,
3501        values: &[Vec<T::T>],
3502    ) -> Vec<Option<T::T>> {
3503        let data: Vec<Option<T::T>> = match def_levels {
3504            Some(levels) => {
3505                let mut values_iter = values.iter().flatten();
3506                levels
3507                    .iter()
3508                    .flatten()
3509                    .map(|d| match d {
3510                        1 => Some(values_iter.next().cloned().unwrap()),
3511                        0 => None,
3512                        _ => unreachable!(),
3513                    })
3514                    .collect()
3515            }
3516            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3517        };
3518        data
3519    }
3520
3521    fn generate_single_column_file_with_data<T: DataType>(
3522        values: &[Vec<T::T>],
3523        def_levels: Option<&Vec<Vec<i16>>>,
3524        file: File,
3525        schema: TypePtr,
3526        field: Option<Field>,
3527        opts: &TestOptions,
3528    ) -> Result<ParquetMetaData> {
3529        let mut writer_props = opts.writer_props();
3530        if let Some(field) = field {
3531            let arrow_schema = Schema::new(vec![field]);
3532            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3533        }
3534
3535        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3536
3537        for (idx, v) in values.iter().enumerate() {
3538            let def_levels = def_levels.map(|d| d[idx].as_slice());
3539            let mut row_group_writer = writer.next_row_group()?;
3540            {
3541                let mut column_writer = row_group_writer
3542                    .next_column()?
3543                    .expect("Column writer is none!");
3544
3545                column_writer
3546                    .typed::<T>()
3547                    .write_batch(v, def_levels, None)?;
3548
3549                column_writer.close()?;
3550            }
3551            row_group_writer.close()?;
3552        }
3553
3554        writer.close()
3555    }
3556
3557    fn get_test_file(file_name: &str) -> File {
3558        let mut path = PathBuf::new();
3559        path.push(arrow::util::test_util::arrow_test_data());
3560        path.push(file_name);
3561
3562        File::open(path.as_path()).expect("File not found!")
3563    }
3564
3565    #[test]
3566    fn test_read_structs() {
3567        // This particular test file has columns of struct types where there is
3568        // a column that has the same name as one of the struct fields
3569        // (see: ARROW-11452)
3570        let testdata = arrow::util::test_util::parquet_test_data();
3571        let path = format!("{testdata}/nested_structs.rust.parquet");
3572        let file = File::open(&path).unwrap();
3573        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3574
3575        for batch in record_batch_reader {
3576            batch.unwrap();
3577        }
3578
3579        let file = File::open(&path).unwrap();
3580        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3581
3582        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3583        let projected_reader = builder
3584            .with_projection(mask)
3585            .with_batch_size(60)
3586            .build()
3587            .unwrap();
3588
3589        let expected_schema = Schema::new(vec![
3590            Field::new(
3591                "roll_num",
3592                ArrowDataType::Struct(Fields::from(vec![Field::new(
3593                    "count",
3594                    ArrowDataType::UInt64,
3595                    false,
3596                )])),
3597                false,
3598            ),
3599            Field::new(
3600                "PC_CUR",
3601                ArrowDataType::Struct(Fields::from(vec![
3602                    Field::new("mean", ArrowDataType::Int64, false),
3603                    Field::new("sum", ArrowDataType::Int64, false),
3604                ])),
3605                false,
3606            ),
3607        ]);
3608
3609        // Tests for #1652 and #1654
3610        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3611
3612        for batch in projected_reader {
3613            let batch = batch.unwrap();
3614            assert_eq!(batch.schema().as_ref(), &expected_schema);
3615        }
3616    }
3617
3618    #[test]
3619    // same as test_read_structs but constructs projection mask via column names
3620    fn test_read_structs_by_name() {
3621        let testdata = arrow::util::test_util::parquet_test_data();
3622        let path = format!("{testdata}/nested_structs.rust.parquet");
3623        let file = File::open(&path).unwrap();
3624        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3625
3626        for batch in record_batch_reader {
3627            batch.unwrap();
3628        }
3629
3630        let file = File::open(&path).unwrap();
3631        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3632
3633        let mask = ProjectionMask::columns(
3634            builder.parquet_schema(),
3635            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3636        );
3637        let projected_reader = builder
3638            .with_projection(mask)
3639            .with_batch_size(60)
3640            .build()
3641            .unwrap();
3642
3643        let expected_schema = Schema::new(vec![
3644            Field::new(
3645                "roll_num",
3646                ArrowDataType::Struct(Fields::from(vec![Field::new(
3647                    "count",
3648                    ArrowDataType::UInt64,
3649                    false,
3650                )])),
3651                false,
3652            ),
3653            Field::new(
3654                "PC_CUR",
3655                ArrowDataType::Struct(Fields::from(vec![
3656                    Field::new("mean", ArrowDataType::Int64, false),
3657                    Field::new("sum", ArrowDataType::Int64, false),
3658                ])),
3659                false,
3660            ),
3661        ]);
3662
3663        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3664
3665        for batch in projected_reader {
3666            let batch = batch.unwrap();
3667            assert_eq!(batch.schema().as_ref(), &expected_schema);
3668        }
3669    }
3670
3671    #[test]
3672    fn test_read_maps() {
3673        let testdata = arrow::util::test_util::parquet_test_data();
3674        let path = format!("{testdata}/nested_maps.snappy.parquet");
3675        let file = File::open(path).unwrap();
3676        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3677
3678        for batch in record_batch_reader {
3679            batch.unwrap();
3680        }
3681    }
3682
3683    #[test]
3684    fn test_nested_nullability() {
3685        let message_type = "message nested {
3686          OPTIONAL Group group {
3687            REQUIRED INT32 leaf;
3688          }
3689        }";
3690
3691        let file = tempfile::tempfile().unwrap();
3692        let schema = Arc::new(parse_message_type(message_type).unwrap());
3693
3694        {
3695            // Write using low-level parquet API (#1167)
3696            let mut writer =
3697                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3698                    .unwrap();
3699
3700            {
3701                let mut row_group_writer = writer.next_row_group().unwrap();
3702                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3703
3704                column_writer
3705                    .typed::<Int32Type>()
3706                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3707                    .unwrap();
3708
3709                column_writer.close().unwrap();
3710                row_group_writer.close().unwrap();
3711            }
3712
3713            writer.close().unwrap();
3714        }
3715
3716        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3717        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3718
3719        let reader = builder.with_projection(mask).build().unwrap();
3720
3721        let expected_schema = Schema::new(vec![Field::new(
3722            "group",
3723            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3724            true,
3725        )]);
3726
3727        let batch = reader.into_iter().next().unwrap().unwrap();
3728        assert_eq!(batch.schema().as_ref(), &expected_schema);
3729        assert_eq!(batch.num_rows(), 4);
3730        assert_eq!(batch.column(0).null_count(), 2);
3731    }
3732
3733    #[test]
3734    fn test_invalid_utf8() {
3735        // a parquet file with 1 column with invalid utf8
3736        let data = vec![
3737            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3738            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3739            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3740            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3741            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3742            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3743            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3744            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3745            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3746            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3747            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3748            82, 49,
3749        ];
3750
3751        let file = Bytes::from(data);
3752        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3753
3754        let error = record_batch_reader.next().unwrap().unwrap_err();
3755
3756        assert!(
3757            error.to_string().contains("invalid utf-8 sequence"),
3758            "{}",
3759            error
3760        );
3761    }
3762
3763    #[test]
3764    fn test_invalid_utf8_string_array() {
3765        test_invalid_utf8_string_array_inner::<i32>();
3766    }
3767
3768    #[test]
3769    fn test_invalid_utf8_large_string_array() {
3770        test_invalid_utf8_string_array_inner::<i64>();
3771    }
3772
3773    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3774        let cases = [
3775            invalid_utf8_first_char::<O>(),
3776            invalid_utf8_first_char_long_strings::<O>(),
3777            invalid_utf8_later_char::<O>(),
3778            invalid_utf8_later_char_long_strings::<O>(),
3779            invalid_utf8_later_char_really_long_strings::<O>(),
3780            invalid_utf8_later_char_really_long_strings2::<O>(),
3781        ];
3782        for array in &cases {
3783            for encoding in STRING_ENCODINGS {
3784                // data is not valid utf8 we can not construct a correct StringArray
3785                // safely, so purposely create an invalid StringArray
3786                let array = unsafe {
3787                    GenericStringArray::<O>::new_unchecked(
3788                        array.offsets().clone(),
3789                        array.values().clone(),
3790                        array.nulls().cloned(),
3791                    )
3792                };
3793                let data_type = array.data_type().clone();
3794                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3795                let err = read_from_parquet(data).unwrap_err();
3796                let expected_err =
3797                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3798                assert!(
3799                    err.to_string().contains(expected_err),
3800                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3801                );
3802            }
3803        }
3804    }
3805
3806    #[test]
3807    fn test_invalid_utf8_string_view_array() {
3808        let cases = [
3809            invalid_utf8_first_char::<i32>(),
3810            invalid_utf8_first_char_long_strings::<i32>(),
3811            invalid_utf8_later_char::<i32>(),
3812            invalid_utf8_later_char_long_strings::<i32>(),
3813            invalid_utf8_later_char_really_long_strings::<i32>(),
3814            invalid_utf8_later_char_really_long_strings2::<i32>(),
3815        ];
3816
3817        for encoding in STRING_ENCODINGS {
3818            for array in &cases {
3819                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3820                let array = array.as_binary_view();
3821
3822                // data is not valid utf8 we can not construct a correct StringArray
3823                // safely, so purposely create an invalid StringViewArray
3824                let array = unsafe {
3825                    StringViewArray::new_unchecked(
3826                        array.views().clone(),
3827                        array.data_buffers().to_vec(),
3828                        array.nulls().cloned(),
3829                    )
3830                };
3831
3832                let data_type = array.data_type().clone();
3833                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3834                let err = read_from_parquet(data).unwrap_err();
3835                let expected_err =
3836                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3837                assert!(
3838                    err.to_string().contains(expected_err),
3839                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3840                );
3841            }
3842        }
3843    }
3844
3845    /// Encodings suitable for string data
3846    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3847        None,
3848        Some(Encoding::PLAIN),
3849        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3850        Some(Encoding::DELTA_BYTE_ARRAY),
3851    ];
3852
3853    /// Invalid Utf-8 sequence in the first character
3854    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3855    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3856
3857    /// Invalid Utf=8 sequence in NOT the first character
3858    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3859    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3860
3861    /// returns a BinaryArray with invalid UTF8 data in the first character
3862    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3863        let valid: &[u8] = b"   ";
3864        let invalid = INVALID_UTF8_FIRST_CHAR;
3865        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3866    }
3867
3868    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3869    /// string larger than 12 bytes which is handled specially when reading
3870    /// `ByteViewArray`s
3871    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3872        let valid: &[u8] = b"   ";
3873        let mut invalid = vec![];
3874        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3875        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3876        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3877    }
3878
3879    /// returns a BinaryArray with invalid UTF8 data in a character other than
3880    /// the first (this is checked in a special codepath)
3881    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3882        let valid: &[u8] = b"   ";
3883        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3884        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3885    }
3886
3887    /// returns a BinaryArray with invalid UTF8 data in a character other than
3888    /// the first in a string larger than 12 bytes which is handled specially
3889    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3890    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3891        let valid: &[u8] = b"   ";
3892        let mut invalid = vec![];
3893        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3894        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3895        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3896    }
3897
3898    /// returns a BinaryArray with invalid UTF8 data in a character other than
3899    /// the first in a string larger than 128 bytes which is handled specially
3900    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3901    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3902        let valid: &[u8] = b"   ";
3903        let mut invalid = vec![];
3904        for _ in 0..10 {
3905            // each instance is 38 bytes
3906            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3907        }
3908        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3909        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3910    }
3911
3912    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3913    /// invalid UTF8 data in a character other than the first in a string larger
3914    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3915        let valid: &[u8] = b"   ";
3916        let mut valid_long = vec![];
3917        for _ in 0..10 {
3918            // each instance is 38 bytes
3919            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3920        }
3921        let invalid = INVALID_UTF8_LATER_CHAR;
3922        GenericBinaryArray::<O>::from_iter(vec![
3923            None,
3924            Some(valid),
3925            Some(invalid),
3926            None,
3927            Some(&valid_long),
3928            Some(valid),
3929        ])
3930    }
3931
3932    /// writes the array into a single column parquet file with the specified
3933    /// encoding.
3934    ///
3935    /// If no encoding is specified, use default (dictionary) encoding
3936    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3937        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3938        let mut data = vec![];
3939        let schema = batch.schema();
3940        let props = encoding.map(|encoding| {
3941            WriterProperties::builder()
3942                // must disable dictionary encoding to actually use encoding
3943                .set_dictionary_enabled(false)
3944                .set_encoding(encoding)
3945                .build()
3946        });
3947
3948        {
3949            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3950            writer.write(&batch).unwrap();
3951            writer.flush().unwrap();
3952            writer.close().unwrap();
3953        };
3954        data
3955    }
3956
3957    /// read the parquet file into a record batch
3958    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3959        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3960            .unwrap()
3961            .build()
3962            .unwrap();
3963
3964        reader.collect()
3965    }
3966
3967    #[test]
3968    fn test_dictionary_preservation() {
3969        let fields = vec![Arc::new(
3970            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3971                .with_repetition(Repetition::OPTIONAL)
3972                .with_converted_type(ConvertedType::UTF8)
3973                .build()
3974                .unwrap(),
3975        )];
3976
3977        let schema = Arc::new(
3978            Type::group_type_builder("test_schema")
3979                .with_fields(fields)
3980                .build()
3981                .unwrap(),
3982        );
3983
3984        let dict_type = ArrowDataType::Dictionary(
3985            Box::new(ArrowDataType::Int32),
3986            Box::new(ArrowDataType::Utf8),
3987        );
3988
3989        let arrow_field = Field::new("leaf", dict_type, true);
3990
3991        let mut file = tempfile::tempfile().unwrap();
3992
3993        let values = vec![
3994            vec![
3995                ByteArray::from("hello"),
3996                ByteArray::from("a"),
3997                ByteArray::from("b"),
3998                ByteArray::from("d"),
3999            ],
4000            vec![
4001                ByteArray::from("c"),
4002                ByteArray::from("a"),
4003                ByteArray::from("b"),
4004            ],
4005        ];
4006
4007        let def_levels = vec![
4008            vec![1, 0, 0, 1, 0, 0, 1, 1],
4009            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4010        ];
4011
4012        let opts = TestOptions {
4013            encoding: Encoding::RLE_DICTIONARY,
4014            ..Default::default()
4015        };
4016
4017        generate_single_column_file_with_data::<ByteArrayType>(
4018            &values,
4019            Some(&def_levels),
4020            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
4021            schema,
4022            Some(arrow_field),
4023            &opts,
4024        )
4025        .unwrap();
4026
4027        file.rewind().unwrap();
4028
4029        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4030
4031        let batches = record_reader
4032            .collect::<Result<Vec<RecordBatch>, _>>()
4033            .unwrap();
4034
4035        assert_eq!(batches.len(), 6);
4036        assert!(batches.iter().all(|x| x.num_columns() == 1));
4037
4038        let row_counts = batches
4039            .iter()
4040            .map(|x| (x.num_rows(), x.column(0).null_count()))
4041            .collect::<Vec<_>>();
4042
4043        assert_eq!(
4044            row_counts,
4045            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4046        );
4047
4048        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4049
4050        // First and second batch in same row group -> same dictionary
4051        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4052        // Third batch spans row group -> computed dictionary
4053        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4054        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4055        // Fourth, fifth and sixth from same row group -> same dictionary
4056        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4057        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4058    }
4059
4060    #[test]
4061    fn test_read_null_list() {
4062        let testdata = arrow::util::test_util::parquet_test_data();
4063        let path = format!("{testdata}/null_list.parquet");
4064        let file = File::open(path).unwrap();
4065        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4066
4067        let batch = record_batch_reader.next().unwrap().unwrap();
4068        assert_eq!(batch.num_rows(), 1);
4069        assert_eq!(batch.num_columns(), 1);
4070        assert_eq!(batch.column(0).len(), 1);
4071
4072        let list = batch
4073            .column(0)
4074            .as_any()
4075            .downcast_ref::<ListArray>()
4076            .unwrap();
4077        assert_eq!(list.len(), 1);
4078        assert!(list.is_valid(0));
4079
4080        let val = list.value(0);
4081        assert_eq!(val.len(), 0);
4082    }
4083
4084    #[test]
4085    fn test_null_schema_inference() {
4086        let testdata = arrow::util::test_util::parquet_test_data();
4087        let path = format!("{testdata}/null_list.parquet");
4088        let file = File::open(path).unwrap();
4089
4090        let arrow_field = Field::new(
4091            "emptylist",
4092            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4093            true,
4094        );
4095
4096        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4097        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4098        let schema = builder.schema();
4099        assert_eq!(schema.fields().len(), 1);
4100        assert_eq!(schema.field(0), &arrow_field);
4101    }
4102
4103    #[test]
4104    fn test_skip_metadata() {
4105        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4106        let field = Field::new("col", col.data_type().clone(), true);
4107
4108        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4109
4110        let metadata = [("key".to_string(), "value".to_string())]
4111            .into_iter()
4112            .collect();
4113
4114        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4115
4116        assert_ne!(schema_with_metadata, schema_without_metadata);
4117
4118        let batch =
4119            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4120
4121        let file = |version: WriterVersion| {
4122            let props = WriterProperties::builder()
4123                .set_writer_version(version)
4124                .build();
4125
4126            let file = tempfile().unwrap();
4127            let mut writer =
4128                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4129                    .unwrap();
4130            writer.write(&batch).unwrap();
4131            writer.close().unwrap();
4132            file
4133        };
4134
4135        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4136
4137        let v1_reader = file(WriterVersion::PARQUET_1_0);
4138        let v2_reader = file(WriterVersion::PARQUET_2_0);
4139
4140        let arrow_reader =
4141            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4142        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4143
4144        let reader =
4145            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4146                .unwrap()
4147                .build()
4148                .unwrap();
4149        assert_eq!(reader.schema(), schema_without_metadata);
4150
4151        let arrow_reader =
4152            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4153        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4154
4155        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4156            .unwrap()
4157            .build()
4158            .unwrap();
4159        assert_eq!(reader.schema(), schema_without_metadata);
4160    }
4161
4162    fn write_parquet_from_iter<I, F>(value: I) -> File
4163    where
4164        I: IntoIterator<Item = (F, ArrayRef)>,
4165        F: AsRef<str>,
4166    {
4167        let batch = RecordBatch::try_from_iter(value).unwrap();
4168        let file = tempfile().unwrap();
4169        let mut writer =
4170            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4171        writer.write(&batch).unwrap();
4172        writer.close().unwrap();
4173        file
4174    }
4175
4176    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4177    where
4178        I: IntoIterator<Item = (F, ArrayRef)>,
4179        F: AsRef<str>,
4180    {
4181        let file = write_parquet_from_iter(value);
4182        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4183        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4184            file.try_clone().unwrap(),
4185            options_with_schema,
4186        );
4187        assert_eq!(builder.err().unwrap().to_string(), expected_error);
4188    }
4189
4190    #[test]
4191    fn test_schema_too_few_columns() {
4192        run_schema_test_with_error(
4193            vec![
4194                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4195                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4196            ],
4197            Arc::new(Schema::new(vec![Field::new(
4198                "int64",
4199                ArrowDataType::Int64,
4200                false,
4201            )])),
4202            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4203        );
4204    }
4205
4206    #[test]
4207    fn test_schema_too_many_columns() {
4208        run_schema_test_with_error(
4209            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4210            Arc::new(Schema::new(vec![
4211                Field::new("int64", ArrowDataType::Int64, false),
4212                Field::new("int32", ArrowDataType::Int32, false),
4213            ])),
4214            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4215        );
4216    }
4217
4218    #[test]
4219    fn test_schema_mismatched_column_names() {
4220        run_schema_test_with_error(
4221            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4222            Arc::new(Schema::new(vec![Field::new(
4223                "other",
4224                ArrowDataType::Int64,
4225                false,
4226            )])),
4227            "Arrow: incompatible arrow schema, expected field named int64 got other",
4228        );
4229    }
4230
4231    #[test]
4232    fn test_schema_incompatible_columns() {
4233        run_schema_test_with_error(
4234            vec![
4235                (
4236                    "col1_invalid",
4237                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4238                ),
4239                (
4240                    "col2_valid",
4241                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4242                ),
4243                (
4244                    "col3_invalid",
4245                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4246                ),
4247            ],
4248            Arc::new(Schema::new(vec![
4249                Field::new("col1_invalid", ArrowDataType::Int32, false),
4250                Field::new("col2_valid", ArrowDataType::Int32, false),
4251                Field::new("col3_invalid", ArrowDataType::Int32, false),
4252            ])),
4253            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4254        );
4255    }
4256
4257    #[test]
4258    fn test_one_incompatible_nested_column() {
4259        let nested_fields = Fields::from(vec![
4260            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4261            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4262        ]);
4263        let nested = StructArray::try_new(
4264            nested_fields,
4265            vec![
4266                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4267                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4268            ],
4269            None,
4270        )
4271        .expect("struct array");
4272        let supplied_nested_fields = Fields::from(vec![
4273            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4274            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4275        ]);
4276        run_schema_test_with_error(
4277            vec![
4278                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4279                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4280                ("nested", Arc::new(nested) as ArrayRef),
4281            ],
4282            Arc::new(Schema::new(vec![
4283                Field::new("col1", ArrowDataType::Int64, false),
4284                Field::new("col2", ArrowDataType::Int32, false),
4285                Field::new(
4286                    "nested",
4287                    ArrowDataType::Struct(supplied_nested_fields),
4288                    false,
4289                ),
4290            ])),
4291            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4292            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4293            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4294        );
4295    }
4296
4297    /// Return parquet data with a single column of utf8 strings
4298    fn utf8_parquet() -> Bytes {
4299        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4300        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4301        let props = None;
4302        // write parquet file with non nullable strings
4303        let mut parquet_data = vec![];
4304        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4305        writer.write(&batch).unwrap();
4306        writer.close().unwrap();
4307        Bytes::from(parquet_data)
4308    }
4309
4310    #[test]
4311    fn test_schema_error_bad_types() {
4312        // verify incompatible schemas error on read
4313        let parquet_data = utf8_parquet();
4314
4315        // Ask to read it back with an incompatible schema (int vs string)
4316        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4317            "column1",
4318            arrow::datatypes::DataType::Int32,
4319            false,
4320        )]));
4321
4322        // read it back out
4323        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4324        let err =
4325            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4326                .unwrap_err();
4327        assert_eq!(
4328            err.to_string(),
4329            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4330        )
4331    }
4332
4333    #[test]
4334    fn test_schema_error_bad_nullability() {
4335        // verify incompatible schemas error on read
4336        let parquet_data = utf8_parquet();
4337
4338        // Ask to read it back with an incompatible schema (nullability mismatch)
4339        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4340            "column1",
4341            arrow::datatypes::DataType::Utf8,
4342            true,
4343        )]));
4344
4345        // read it back out
4346        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4347        let err =
4348            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4349                .unwrap_err();
4350        assert_eq!(
4351            err.to_string(),
4352            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4353        )
4354    }
4355
4356    #[test]
4357    fn test_read_binary_as_utf8() {
4358        let file = write_parquet_from_iter(vec![
4359            (
4360                "binary_to_utf8",
4361                Arc::new(BinaryArray::from(vec![
4362                    b"one".as_ref(),
4363                    b"two".as_ref(),
4364                    b"three".as_ref(),
4365                ])) as ArrayRef,
4366            ),
4367            (
4368                "large_binary_to_large_utf8",
4369                Arc::new(LargeBinaryArray::from(vec![
4370                    b"one".as_ref(),
4371                    b"two".as_ref(),
4372                    b"three".as_ref(),
4373                ])) as ArrayRef,
4374            ),
4375            (
4376                "binary_view_to_utf8_view",
4377                Arc::new(BinaryViewArray::from(vec![
4378                    b"one".as_ref(),
4379                    b"two".as_ref(),
4380                    b"three".as_ref(),
4381                ])) as ArrayRef,
4382            ),
4383        ]);
4384        let supplied_fields = Fields::from(vec![
4385            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4386            Field::new(
4387                "large_binary_to_large_utf8",
4388                ArrowDataType::LargeUtf8,
4389                false,
4390            ),
4391            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4392        ]);
4393
4394        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4395        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4396            file.try_clone().unwrap(),
4397            options,
4398        )
4399        .expect("reader builder with schema")
4400        .build()
4401        .expect("reader with schema");
4402
4403        let batch = arrow_reader.next().unwrap().unwrap();
4404        assert_eq!(batch.num_columns(), 3);
4405        assert_eq!(batch.num_rows(), 3);
4406        assert_eq!(
4407            batch
4408                .column(0)
4409                .as_string::<i32>()
4410                .iter()
4411                .collect::<Vec<_>>(),
4412            vec![Some("one"), Some("two"), Some("three")]
4413        );
4414
4415        assert_eq!(
4416            batch
4417                .column(1)
4418                .as_string::<i64>()
4419                .iter()
4420                .collect::<Vec<_>>(),
4421            vec![Some("one"), Some("two"), Some("three")]
4422        );
4423
4424        assert_eq!(
4425            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4426            vec![Some("one"), Some("two"), Some("three")]
4427        );
4428    }
4429
4430    #[test]
4431    #[should_panic(expected = "Invalid UTF8 sequence at")]
4432    fn test_read_non_utf8_binary_as_utf8() {
4433        let file = write_parquet_from_iter(vec![(
4434            "non_utf8_binary",
4435            Arc::new(BinaryArray::from(vec![
4436                b"\xDE\x00\xFF".as_ref(),
4437                b"\xDE\x01\xAA".as_ref(),
4438                b"\xDE\x02\xFF".as_ref(),
4439            ])) as ArrayRef,
4440        )]);
4441        let supplied_fields = Fields::from(vec![Field::new(
4442            "non_utf8_binary",
4443            ArrowDataType::Utf8,
4444            false,
4445        )]);
4446
4447        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4448        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4449            file.try_clone().unwrap(),
4450            options,
4451        )
4452        .expect("reader builder with schema")
4453        .build()
4454        .expect("reader with schema");
4455        arrow_reader.next().unwrap().unwrap_err();
4456    }
4457
4458    #[test]
4459    fn test_with_schema() {
4460        let nested_fields = Fields::from(vec![
4461            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4462            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4463        ]);
4464
4465        let nested_arrays: Vec<ArrayRef> = vec![
4466            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4467            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4468        ];
4469
4470        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4471
4472        let file = write_parquet_from_iter(vec![
4473            (
4474                "int32_to_ts_second",
4475                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4476            ),
4477            (
4478                "date32_to_date64",
4479                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4480            ),
4481            ("nested", Arc::new(nested) as ArrayRef),
4482        ]);
4483
4484        let supplied_nested_fields = Fields::from(vec![
4485            Field::new(
4486                "utf8_to_dict",
4487                ArrowDataType::Dictionary(
4488                    Box::new(ArrowDataType::Int32),
4489                    Box::new(ArrowDataType::Utf8),
4490                ),
4491                false,
4492            ),
4493            Field::new(
4494                "int64_to_ts_nano",
4495                ArrowDataType::Timestamp(
4496                    arrow::datatypes::TimeUnit::Nanosecond,
4497                    Some("+10:00".into()),
4498                ),
4499                false,
4500            ),
4501        ]);
4502
4503        let supplied_schema = Arc::new(Schema::new(vec![
4504            Field::new(
4505                "int32_to_ts_second",
4506                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4507                false,
4508            ),
4509            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4510            Field::new(
4511                "nested",
4512                ArrowDataType::Struct(supplied_nested_fields),
4513                false,
4514            ),
4515        ]));
4516
4517        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4518        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4519            file.try_clone().unwrap(),
4520            options,
4521        )
4522        .expect("reader builder with schema")
4523        .build()
4524        .expect("reader with schema");
4525
4526        assert_eq!(arrow_reader.schema(), supplied_schema);
4527        let batch = arrow_reader.next().unwrap().unwrap();
4528        assert_eq!(batch.num_columns(), 3);
4529        assert_eq!(batch.num_rows(), 4);
4530        assert_eq!(
4531            batch
4532                .column(0)
4533                .as_any()
4534                .downcast_ref::<TimestampSecondArray>()
4535                .expect("downcast to timestamp second")
4536                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4537                .map(|v| v.to_string())
4538                .expect("value as datetime"),
4539            "1970-01-01 01:00:00 +01:00"
4540        );
4541        assert_eq!(
4542            batch
4543                .column(1)
4544                .as_any()
4545                .downcast_ref::<Date64Array>()
4546                .expect("downcast to date64")
4547                .value_as_date(0)
4548                .map(|v| v.to_string())
4549                .expect("value as date"),
4550            "1970-01-01"
4551        );
4552
4553        let nested = batch
4554            .column(2)
4555            .as_any()
4556            .downcast_ref::<StructArray>()
4557            .expect("downcast to struct");
4558
4559        let nested_dict = nested
4560            .column(0)
4561            .as_any()
4562            .downcast_ref::<Int32DictionaryArray>()
4563            .expect("downcast to dictionary");
4564
4565        assert_eq!(
4566            nested_dict
4567                .values()
4568                .as_any()
4569                .downcast_ref::<StringArray>()
4570                .expect("downcast to string")
4571                .iter()
4572                .collect::<Vec<_>>(),
4573            vec![Some("a"), Some("b")]
4574        );
4575
4576        assert_eq!(
4577            nested_dict.keys().iter().collect::<Vec<_>>(),
4578            vec![Some(0), Some(0), Some(0), Some(1)]
4579        );
4580
4581        assert_eq!(
4582            nested
4583                .column(1)
4584                .as_any()
4585                .downcast_ref::<TimestampNanosecondArray>()
4586                .expect("downcast to timestamp nanosecond")
4587                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4588                .map(|v| v.to_string())
4589                .expect("value as datetime"),
4590            "1970-01-01 10:00:00.000000001 +10:00"
4591        );
4592    }
4593
4594    #[test]
4595    fn test_empty_projection() {
4596        let testdata = arrow::util::test_util::parquet_test_data();
4597        let path = format!("{testdata}/alltypes_plain.parquet");
4598        let file = File::open(path).unwrap();
4599
4600        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4601        let file_metadata = builder.metadata().file_metadata();
4602        let expected_rows = file_metadata.num_rows() as usize;
4603
4604        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4605        let batch_reader = builder
4606            .with_projection(mask)
4607            .with_batch_size(2)
4608            .build()
4609            .unwrap();
4610
4611        let mut total_rows = 0;
4612        for maybe_batch in batch_reader {
4613            let batch = maybe_batch.unwrap();
4614            total_rows += batch.num_rows();
4615            assert_eq!(batch.num_columns(), 0);
4616            assert!(batch.num_rows() <= 2);
4617        }
4618
4619        assert_eq!(total_rows, expected_rows);
4620    }
4621
4622    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4623        let schema = Arc::new(Schema::new(vec![Field::new(
4624            "list",
4625            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4626            true,
4627        )]));
4628
4629        let mut buf = Vec::with_capacity(1024);
4630
4631        let mut writer = ArrowWriter::try_new(
4632            &mut buf,
4633            schema.clone(),
4634            Some(
4635                WriterProperties::builder()
4636                    .set_max_row_group_size(row_group_size)
4637                    .build(),
4638            ),
4639        )
4640        .unwrap();
4641        for _ in 0..2 {
4642            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4643            for _ in 0..(batch_size) {
4644                list_builder.append(true);
4645            }
4646            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4647                .unwrap();
4648            writer.write(&batch).unwrap();
4649        }
4650        writer.close().unwrap();
4651
4652        let mut record_reader =
4653            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4654        assert_eq!(
4655            batch_size,
4656            record_reader.next().unwrap().unwrap().num_rows()
4657        );
4658        assert_eq!(
4659            batch_size,
4660            record_reader.next().unwrap().unwrap().num_rows()
4661        );
4662    }
4663
4664    #[test]
4665    fn test_row_group_exact_multiple() {
4666        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4667        test_row_group_batch(8, 8);
4668        test_row_group_batch(10, 8);
4669        test_row_group_batch(8, 10);
4670        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4671        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4672        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4673        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4674        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4675    }
4676
4677    /// Given a RecordBatch containing all the column data, return the expected batches given
4678    /// a `batch_size` and `selection`
4679    fn get_expected_batches(
4680        column: &RecordBatch,
4681        selection: &RowSelection,
4682        batch_size: usize,
4683    ) -> Vec<RecordBatch> {
4684        let mut expected_batches = vec![];
4685
4686        let mut selection: VecDeque<_> = selection.clone().into();
4687        let mut row_offset = 0;
4688        let mut last_start = None;
4689        while row_offset < column.num_rows() && !selection.is_empty() {
4690            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4691            while batch_remaining > 0 && !selection.is_empty() {
4692                let (to_read, skip) = match selection.front_mut() {
4693                    Some(selection) if selection.row_count > batch_remaining => {
4694                        selection.row_count -= batch_remaining;
4695                        (batch_remaining, selection.skip)
4696                    }
4697                    Some(_) => {
4698                        let select = selection.pop_front().unwrap();
4699                        (select.row_count, select.skip)
4700                    }
4701                    None => break,
4702                };
4703
4704                batch_remaining -= to_read;
4705
4706                match skip {
4707                    true => {
4708                        if let Some(last_start) = last_start.take() {
4709                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4710                        }
4711                        row_offset += to_read
4712                    }
4713                    false => {
4714                        last_start.get_or_insert(row_offset);
4715                        row_offset += to_read
4716                    }
4717                }
4718            }
4719        }
4720
4721        if let Some(last_start) = last_start.take() {
4722            expected_batches.push(column.slice(last_start, row_offset - last_start))
4723        }
4724
4725        // Sanity check, all batches except the final should be the batch size
4726        for batch in &expected_batches[..expected_batches.len() - 1] {
4727            assert_eq!(batch.num_rows(), batch_size);
4728        }
4729
4730        expected_batches
4731    }
4732
4733    fn create_test_selection(
4734        step_len: usize,
4735        total_len: usize,
4736        skip_first: bool,
4737    ) -> (RowSelection, usize) {
4738        let mut remaining = total_len;
4739        let mut skip = skip_first;
4740        let mut vec = vec![];
4741        let mut selected_count = 0;
4742        while remaining != 0 {
4743            let step = if remaining > step_len {
4744                step_len
4745            } else {
4746                remaining
4747            };
4748            vec.push(RowSelector {
4749                row_count: step,
4750                skip,
4751            });
4752            remaining -= step;
4753            if !skip {
4754                selected_count += step;
4755            }
4756            skip = !skip;
4757        }
4758        (vec.into(), selected_count)
4759    }
4760
4761    #[test]
4762    fn test_scan_row_with_selection() {
4763        let testdata = arrow::util::test_util::parquet_test_data();
4764        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4765        let test_file = File::open(&path).unwrap();
4766
4767        let mut serial_reader =
4768            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4769        let data = serial_reader.next().unwrap().unwrap();
4770
4771        let do_test = |batch_size: usize, selection_len: usize| {
4772            for skip_first in [false, true] {
4773                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4774
4775                let expected = get_expected_batches(&data, &selections, batch_size);
4776                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4777                assert_eq!(
4778                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4779                    expected,
4780                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4781                );
4782            }
4783        };
4784
4785        // total row count 7300
4786        // 1. test selection len more than one page row count
4787        do_test(1000, 1000);
4788
4789        // 2. test selection len less than one page row count
4790        do_test(20, 20);
4791
4792        // 3. test selection_len less than batch_size
4793        do_test(20, 5);
4794
4795        // 4. test selection_len more than batch_size
4796        // If batch_size < selection_len
4797        do_test(20, 5);
4798
4799        fn create_skip_reader(
4800            test_file: &File,
4801            batch_size: usize,
4802            selections: RowSelection,
4803        ) -> ParquetRecordBatchReader {
4804            let options =
4805                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4806            let file = test_file.try_clone().unwrap();
4807            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4808                .unwrap()
4809                .with_batch_size(batch_size)
4810                .with_row_selection(selections)
4811                .build()
4812                .unwrap()
4813        }
4814    }
4815
4816    #[test]
4817    fn test_batch_size_overallocate() {
4818        let testdata = arrow::util::test_util::parquet_test_data();
4819        // `alltypes_plain.parquet` only have 8 rows
4820        let path = format!("{testdata}/alltypes_plain.parquet");
4821        let test_file = File::open(path).unwrap();
4822
4823        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4824        let num_rows = builder.metadata.file_metadata().num_rows();
4825        let reader = builder
4826            .with_batch_size(1024)
4827            .with_projection(ProjectionMask::all())
4828            .build()
4829            .unwrap();
4830        assert_ne!(1024, num_rows);
4831        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4832    }
4833
4834    #[test]
4835    fn test_read_with_page_index_enabled() {
4836        let testdata = arrow::util::test_util::parquet_test_data();
4837
4838        {
4839            // `alltypes_tiny_pages.parquet` has page index
4840            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4841            let test_file = File::open(path).unwrap();
4842            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4843                test_file,
4844                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4845            )
4846            .unwrap();
4847            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4848            let reader = builder.build().unwrap();
4849            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4850            assert_eq!(batches.len(), 8);
4851        }
4852
4853        {
4854            // `alltypes_plain.parquet` doesn't have page index
4855            let path = format!("{testdata}/alltypes_plain.parquet");
4856            let test_file = File::open(path).unwrap();
4857            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4858                test_file,
4859                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4860            )
4861            .unwrap();
4862            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4863            // we should read the file successfully.
4864            assert!(builder.metadata().offset_index().is_none());
4865            let reader = builder.build().unwrap();
4866            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4867            assert_eq!(batches.len(), 1);
4868        }
4869    }
4870
4871    #[test]
4872    fn test_raw_repetition() {
4873        const MESSAGE_TYPE: &str = "
4874            message Log {
4875              OPTIONAL INT32 eventType;
4876              REPEATED INT32 category;
4877              REPEATED group filter {
4878                OPTIONAL INT32 error;
4879              }
4880            }
4881        ";
4882        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4883        let props = Default::default();
4884
4885        let mut buf = Vec::with_capacity(1024);
4886        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4887        let mut row_group_writer = writer.next_row_group().unwrap();
4888
4889        // column 0
4890        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4891        col_writer
4892            .typed::<Int32Type>()
4893            .write_batch(&[1], Some(&[1]), None)
4894            .unwrap();
4895        col_writer.close().unwrap();
4896        // column 1
4897        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4898        col_writer
4899            .typed::<Int32Type>()
4900            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4901            .unwrap();
4902        col_writer.close().unwrap();
4903        // column 2
4904        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4905        col_writer
4906            .typed::<Int32Type>()
4907            .write_batch(&[1], Some(&[1]), Some(&[0]))
4908            .unwrap();
4909        col_writer.close().unwrap();
4910
4911        let rg_md = row_group_writer.close().unwrap();
4912        assert_eq!(rg_md.num_rows(), 1);
4913        writer.close().unwrap();
4914
4915        let bytes = Bytes::from(buf);
4916
4917        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4918        let full = no_mask.next().unwrap().unwrap();
4919
4920        assert_eq!(full.num_columns(), 3);
4921
4922        for idx in 0..3 {
4923            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4924            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4925            let mut reader = b.with_projection(mask).build().unwrap();
4926            let projected = reader.next().unwrap().unwrap();
4927
4928            assert_eq!(projected.num_columns(), 1);
4929            assert_eq!(full.column(idx), projected.column(0));
4930        }
4931    }
4932
4933    #[test]
4934    fn test_read_lz4_raw() {
4935        let testdata = arrow::util::test_util::parquet_test_data();
4936        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4937        let file = File::open(path).unwrap();
4938
4939        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4940            .unwrap()
4941            .collect::<Result<Vec<_>, _>>()
4942            .unwrap();
4943        assert_eq!(batches.len(), 1);
4944        let batch = &batches[0];
4945
4946        assert_eq!(batch.num_columns(), 3);
4947        assert_eq!(batch.num_rows(), 4);
4948
4949        // https://github.com/apache/parquet-testing/pull/18
4950        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4951        assert_eq!(
4952            a.values(),
4953            &[1593604800, 1593604800, 1593604801, 1593604801]
4954        );
4955
4956        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4957        let a: Vec<_> = a.iter().flatten().collect();
4958        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4959
4960        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4961        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4962    }
4963
4964    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4965    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4966    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4967    //    LZ4_HADOOP algorithm for compression.
4968    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4969    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4970    //    older parquet-cpp versions.
4971    //
4972    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4973    #[test]
4974    fn test_read_lz4_hadoop_fallback() {
4975        for file in [
4976            "hadoop_lz4_compressed.parquet",
4977            "non_hadoop_lz4_compressed.parquet",
4978        ] {
4979            let testdata = arrow::util::test_util::parquet_test_data();
4980            let path = format!("{testdata}/{file}");
4981            let file = File::open(path).unwrap();
4982            let expected_rows = 4;
4983
4984            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4985                .unwrap()
4986                .collect::<Result<Vec<_>, _>>()
4987                .unwrap();
4988            assert_eq!(batches.len(), 1);
4989            let batch = &batches[0];
4990
4991            assert_eq!(batch.num_columns(), 3);
4992            assert_eq!(batch.num_rows(), expected_rows);
4993
4994            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4995            assert_eq!(
4996                a.values(),
4997                &[1593604800, 1593604800, 1593604801, 1593604801]
4998            );
4999
5000            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5001            let b: Vec<_> = b.iter().flatten().collect();
5002            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5003
5004            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5005            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5006        }
5007    }
5008
5009    #[test]
5010    fn test_read_lz4_hadoop_large() {
5011        let testdata = arrow::util::test_util::parquet_test_data();
5012        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5013        let file = File::open(path).unwrap();
5014        let expected_rows = 10000;
5015
5016        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5017            .unwrap()
5018            .collect::<Result<Vec<_>, _>>()
5019            .unwrap();
5020        assert_eq!(batches.len(), 1);
5021        let batch = &batches[0];
5022
5023        assert_eq!(batch.num_columns(), 1);
5024        assert_eq!(batch.num_rows(), expected_rows);
5025
5026        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5027        let a: Vec<_> = a.iter().flatten().collect();
5028        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5029        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5030        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5031        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5032    }
5033
5034    #[test]
5035    #[cfg(feature = "snap")]
5036    fn test_read_nested_lists() {
5037        let testdata = arrow::util::test_util::parquet_test_data();
5038        let path = format!("{testdata}/nested_lists.snappy.parquet");
5039        let file = File::open(path).unwrap();
5040
5041        let f = file.try_clone().unwrap();
5042        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5043        let expected = reader.next().unwrap().unwrap();
5044        assert_eq!(expected.num_rows(), 3);
5045
5046        let selection = RowSelection::from(vec![
5047            RowSelector::skip(1),
5048            RowSelector::select(1),
5049            RowSelector::skip(1),
5050        ]);
5051        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5052            .unwrap()
5053            .with_row_selection(selection)
5054            .build()
5055            .unwrap();
5056
5057        let actual = reader.next().unwrap().unwrap();
5058        assert_eq!(actual.num_rows(), 1);
5059        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5060    }
5061
5062    #[test]
5063    fn test_arbitrary_decimal() {
5064        let values = [1, 2, 3, 4, 5, 6, 7, 8];
5065        let decimals_19_0 = Decimal128Array::from_iter_values(values)
5066            .with_precision_and_scale(19, 0)
5067            .unwrap();
5068        let decimals_12_0 = Decimal128Array::from_iter_values(values)
5069            .with_precision_and_scale(12, 0)
5070            .unwrap();
5071        let decimals_17_10 = Decimal128Array::from_iter_values(values)
5072            .with_precision_and_scale(17, 10)
5073            .unwrap();
5074
5075        let written = RecordBatch::try_from_iter([
5076            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5077            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5078            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5079        ])
5080        .unwrap();
5081
5082        let mut buffer = Vec::with_capacity(1024);
5083        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5084        writer.write(&written).unwrap();
5085        writer.close().unwrap();
5086
5087        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5088            .unwrap()
5089            .collect::<Result<Vec<_>, _>>()
5090            .unwrap();
5091
5092        assert_eq!(&written.slice(0, 8), &read[0]);
5093    }
5094
5095    #[test]
5096    fn test_list_skip() {
5097        let mut list = ListBuilder::new(Int32Builder::new());
5098        list.append_value([Some(1), Some(2)]);
5099        list.append_value([Some(3)]);
5100        list.append_value([Some(4)]);
5101        let list = list.finish();
5102        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5103
5104        // First page contains 2 values but only 1 row
5105        let props = WriterProperties::builder()
5106            .set_data_page_row_count_limit(1)
5107            .set_write_batch_size(2)
5108            .build();
5109
5110        let mut buffer = Vec::with_capacity(1024);
5111        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5112        writer.write(&batch).unwrap();
5113        writer.close().unwrap();
5114
5115        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5116        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5117            .unwrap()
5118            .with_row_selection(selection.into())
5119            .build()
5120            .unwrap();
5121        let out = reader.next().unwrap().unwrap();
5122        assert_eq!(out.num_rows(), 1);
5123        assert_eq!(out, batch.slice(2, 1));
5124    }
5125
5126    fn test_decimal32_roundtrip() {
5127        let d = |values: Vec<i32>, p: u8| {
5128            let iter = values.into_iter();
5129            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5130                .with_precision_and_scale(p, 2)
5131                .unwrap()
5132        };
5133
5134        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5135        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5136
5137        let mut buffer = Vec::with_capacity(1024);
5138        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5139        writer.write(&batch).unwrap();
5140        writer.close().unwrap();
5141
5142        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5143        let t1 = builder.parquet_schema().columns()[0].physical_type();
5144        assert_eq!(t1, PhysicalType::INT32);
5145
5146        let mut reader = builder.build().unwrap();
5147        assert_eq!(batch.schema(), reader.schema());
5148
5149        let out = reader.next().unwrap().unwrap();
5150        assert_eq!(batch, out);
5151    }
5152
5153    fn test_decimal64_roundtrip() {
5154        // Precision <= 9 -> INT32
5155        // Precision <= 18 -> INT64
5156
5157        let d = |values: Vec<i64>, p: u8| {
5158            let iter = values.into_iter();
5159            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5160                .with_precision_and_scale(p, 2)
5161                .unwrap()
5162        };
5163
5164        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5165        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5166        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5167
5168        let batch = RecordBatch::try_from_iter([
5169            ("d1", Arc::new(d1) as ArrayRef),
5170            ("d2", Arc::new(d2) as ArrayRef),
5171            ("d3", Arc::new(d3) as ArrayRef),
5172        ])
5173        .unwrap();
5174
5175        let mut buffer = Vec::with_capacity(1024);
5176        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5177        writer.write(&batch).unwrap();
5178        writer.close().unwrap();
5179
5180        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5181        let t1 = builder.parquet_schema().columns()[0].physical_type();
5182        assert_eq!(t1, PhysicalType::INT32);
5183        let t2 = builder.parquet_schema().columns()[1].physical_type();
5184        assert_eq!(t2, PhysicalType::INT64);
5185        let t3 = builder.parquet_schema().columns()[2].physical_type();
5186        assert_eq!(t3, PhysicalType::INT64);
5187
5188        let mut reader = builder.build().unwrap();
5189        assert_eq!(batch.schema(), reader.schema());
5190
5191        let out = reader.next().unwrap().unwrap();
5192        assert_eq!(batch, out);
5193    }
5194
5195    fn test_decimal_roundtrip<T: DecimalType>() {
5196        // Precision <= 9 -> INT32
5197        // Precision <= 18 -> INT64
5198        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5199
5200        let d = |values: Vec<usize>, p: u8| {
5201            let iter = values.into_iter().map(T::Native::usize_as);
5202            PrimitiveArray::<T>::from_iter_values(iter)
5203                .with_precision_and_scale(p, 2)
5204                .unwrap()
5205        };
5206
5207        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5208        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5209        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5210        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5211
5212        let batch = RecordBatch::try_from_iter([
5213            ("d1", Arc::new(d1) as ArrayRef),
5214            ("d2", Arc::new(d2) as ArrayRef),
5215            ("d3", Arc::new(d3) as ArrayRef),
5216            ("d4", Arc::new(d4) as ArrayRef),
5217        ])
5218        .unwrap();
5219
5220        let mut buffer = Vec::with_capacity(1024);
5221        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5222        writer.write(&batch).unwrap();
5223        writer.close().unwrap();
5224
5225        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5226        let t1 = builder.parquet_schema().columns()[0].physical_type();
5227        assert_eq!(t1, PhysicalType::INT32);
5228        let t2 = builder.parquet_schema().columns()[1].physical_type();
5229        assert_eq!(t2, PhysicalType::INT64);
5230        let t3 = builder.parquet_schema().columns()[2].physical_type();
5231        assert_eq!(t3, PhysicalType::INT64);
5232        let t4 = builder.parquet_schema().columns()[3].physical_type();
5233        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5234
5235        let mut reader = builder.build().unwrap();
5236        assert_eq!(batch.schema(), reader.schema());
5237
5238        let out = reader.next().unwrap().unwrap();
5239        assert_eq!(batch, out);
5240    }
5241
5242    #[test]
5243    fn test_decimal() {
5244        test_decimal32_roundtrip();
5245        test_decimal64_roundtrip();
5246        test_decimal_roundtrip::<Decimal128Type>();
5247        test_decimal_roundtrip::<Decimal256Type>();
5248    }
5249
5250    #[test]
5251    fn test_list_selection() {
5252        let schema = Arc::new(Schema::new(vec![Field::new_list(
5253            "list",
5254            Field::new_list_field(ArrowDataType::Utf8, true),
5255            false,
5256        )]));
5257        let mut buf = Vec::with_capacity(1024);
5258
5259        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5260
5261        for i in 0..2 {
5262            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5263            for j in 0..1024 {
5264                list_a_builder.values().append_value(format!("{i} {j}"));
5265                list_a_builder.append(true);
5266            }
5267            let batch =
5268                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5269                    .unwrap();
5270            writer.write(&batch).unwrap();
5271        }
5272        let _metadata = writer.close().unwrap();
5273
5274        let buf = Bytes::from(buf);
5275        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5276            .unwrap()
5277            .with_row_selection(RowSelection::from(vec![
5278                RowSelector::skip(100),
5279                RowSelector::select(924),
5280                RowSelector::skip(100),
5281                RowSelector::select(924),
5282            ]))
5283            .build()
5284            .unwrap();
5285
5286        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5287        let batch = concat_batches(&schema, &batches).unwrap();
5288
5289        assert_eq!(batch.num_rows(), 924 * 2);
5290        let list = batch.column(0).as_list::<i32>();
5291
5292        for w in list.value_offsets().windows(2) {
5293            assert_eq!(w[0] + 1, w[1])
5294        }
5295        let mut values = list.values().as_string::<i32>().iter();
5296
5297        for i in 0..2 {
5298            for j in 100..1024 {
5299                let expected = format!("{i} {j}");
5300                assert_eq!(values.next().unwrap().unwrap(), &expected);
5301            }
5302        }
5303    }
5304
5305    #[test]
5306    fn test_list_selection_fuzz() {
5307        let mut rng = rng();
5308        let schema = Arc::new(Schema::new(vec![Field::new_list(
5309            "list",
5310            Field::new_list(
5311                Field::LIST_FIELD_DEFAULT_NAME,
5312                Field::new_list_field(ArrowDataType::Int32, true),
5313                true,
5314            ),
5315            true,
5316        )]));
5317        let mut buf = Vec::with_capacity(1024);
5318        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5319
5320        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5321
5322        for _ in 0..2048 {
5323            if rng.random_bool(0.2) {
5324                list_a_builder.append(false);
5325                continue;
5326            }
5327
5328            let list_a_len = rng.random_range(0..10);
5329            let list_b_builder = list_a_builder.values();
5330
5331            for _ in 0..list_a_len {
5332                if rng.random_bool(0.2) {
5333                    list_b_builder.append(false);
5334                    continue;
5335                }
5336
5337                let list_b_len = rng.random_range(0..10);
5338                let int_builder = list_b_builder.values();
5339                for _ in 0..list_b_len {
5340                    match rng.random_bool(0.2) {
5341                        true => int_builder.append_null(),
5342                        false => int_builder.append_value(rng.random()),
5343                    }
5344                }
5345                list_b_builder.append(true)
5346            }
5347            list_a_builder.append(true);
5348        }
5349
5350        let array = Arc::new(list_a_builder.finish());
5351        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5352
5353        writer.write(&batch).unwrap();
5354        let _metadata = writer.close().unwrap();
5355
5356        let buf = Bytes::from(buf);
5357
5358        let cases = [
5359            vec![
5360                RowSelector::skip(100),
5361                RowSelector::select(924),
5362                RowSelector::skip(100),
5363                RowSelector::select(924),
5364            ],
5365            vec![
5366                RowSelector::select(924),
5367                RowSelector::skip(100),
5368                RowSelector::select(924),
5369                RowSelector::skip(100),
5370            ],
5371            vec![
5372                RowSelector::skip(1023),
5373                RowSelector::select(1),
5374                RowSelector::skip(1023),
5375                RowSelector::select(1),
5376            ],
5377            vec![
5378                RowSelector::select(1),
5379                RowSelector::skip(1023),
5380                RowSelector::select(1),
5381                RowSelector::skip(1023),
5382            ],
5383        ];
5384
5385        for batch_size in [100, 1024, 2048] {
5386            for selection in &cases {
5387                let selection = RowSelection::from(selection.clone());
5388                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5389                    .unwrap()
5390                    .with_row_selection(selection.clone())
5391                    .with_batch_size(batch_size)
5392                    .build()
5393                    .unwrap();
5394
5395                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5396                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5397                assert_eq!(actual.num_rows(), selection.row_count());
5398
5399                let mut batch_offset = 0;
5400                let mut actual_offset = 0;
5401                for selector in selection.iter() {
5402                    if selector.skip {
5403                        batch_offset += selector.row_count;
5404                        continue;
5405                    }
5406
5407                    assert_eq!(
5408                        batch.slice(batch_offset, selector.row_count),
5409                        actual.slice(actual_offset, selector.row_count)
5410                    );
5411
5412                    batch_offset += selector.row_count;
5413                    actual_offset += selector.row_count;
5414                }
5415            }
5416        }
5417    }
5418
5419    #[test]
5420    fn test_read_old_nested_list() {
5421        use arrow::datatypes::DataType;
5422        use arrow::datatypes::ToByteSlice;
5423
5424        let testdata = arrow::util::test_util::parquet_test_data();
5425        // message my_record {
5426        //     REQUIRED group a (LIST) {
5427        //         REPEATED group array (LIST) {
5428        //             REPEATED INT32 array;
5429        //         }
5430        //     }
5431        // }
5432        // should be read as list<list<int32>>
5433        let path = format!("{testdata}/old_list_structure.parquet");
5434        let test_file = File::open(path).unwrap();
5435
5436        // create expected ListArray
5437        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5438
5439        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5440        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5441
5442        // Construct a list array from the above two
5443        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5444            "array",
5445            DataType::Int32,
5446            false,
5447        ))))
5448        .len(2)
5449        .add_buffer(a_value_offsets)
5450        .add_child_data(a_values.into_data())
5451        .build()
5452        .unwrap();
5453        let a = ListArray::from(a_list_data);
5454
5455        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5456        let mut reader = builder.build().unwrap();
5457        let out = reader.next().unwrap().unwrap();
5458        assert_eq!(out.num_rows(), 1);
5459        assert_eq!(out.num_columns(), 1);
5460        // grab first column
5461        let c0 = out.column(0);
5462        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5463        // get first row: [[1, 2], [3, 4]]
5464        let r0 = c0arr.value(0);
5465        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5466        assert_eq!(r0arr, &a);
5467    }
5468
5469    #[test]
5470    fn test_map_no_value() {
5471        // File schema:
5472        // message schema {
5473        //   required group my_map (MAP) {
5474        //     repeated group key_value {
5475        //       required int32 key;
5476        //       optional int32 value;
5477        //     }
5478        //   }
5479        //   required group my_map_no_v (MAP) {
5480        //     repeated group key_value {
5481        //       required int32 key;
5482        //     }
5483        //   }
5484        //   required group my_list (LIST) {
5485        //     repeated group list {
5486        //       required int32 element;
5487        //     }
5488        //   }
5489        // }
5490        let testdata = arrow::util::test_util::parquet_test_data();
5491        let path = format!("{testdata}/map_no_value.parquet");
5492        let file = File::open(path).unwrap();
5493
5494        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5495            .unwrap()
5496            .build()
5497            .unwrap();
5498        let out = reader.next().unwrap().unwrap();
5499        assert_eq!(out.num_rows(), 3);
5500        assert_eq!(out.num_columns(), 3);
5501        // my_map_no_v and my_list columns should now be equivalent
5502        let c0 = out.column(1).as_list::<i32>();
5503        let c1 = out.column(2).as_list::<i32>();
5504        assert_eq!(c0.len(), c1.len());
5505        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5506    }
5507
5508    #[test]
5509    fn test_read_unknown_logical_type() {
5510        let testdata = arrow::util::test_util::parquet_test_data();
5511        let path = format!("{testdata}/unknown-logical-type.parquet");
5512        let test_file = File::open(path).unwrap();
5513
5514        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5515            .expect("Error creating reader builder");
5516
5517        let schema = builder.metadata().file_metadata().schema_descr();
5518        assert_eq!(
5519            schema.column(0).logical_type_ref(),
5520            Some(&LogicalType::String)
5521        );
5522        assert_eq!(
5523            schema.column(1).logical_type_ref(),
5524            Some(&LogicalType::_Unknown { field_id: 2555 })
5525        );
5526        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5527
5528        let mut reader = builder.build().unwrap();
5529        let out = reader.next().unwrap().unwrap();
5530        assert_eq!(out.num_rows(), 3);
5531        assert_eq!(out.num_columns(), 2);
5532    }
5533
5534    #[test]
5535    fn test_read_row_numbers() {
5536        let file = write_parquet_from_iter(vec![(
5537            "value",
5538            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5539        )]);
5540        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5541
5542        let row_number_field = Arc::new(
5543            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5544        );
5545
5546        let options = ArrowReaderOptions::new()
5547            .with_schema(Arc::new(Schema::new(supplied_fields)))
5548            .with_virtual_columns(vec![row_number_field.clone()])
5549            .unwrap();
5550        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5551            file.try_clone().unwrap(),
5552            options,
5553        )
5554        .expect("reader builder with schema")
5555        .build()
5556        .expect("reader with schema");
5557
5558        let batch = arrow_reader.next().unwrap().unwrap();
5559        let schema = Arc::new(Schema::new(vec![
5560            Field::new("value", ArrowDataType::Int64, false),
5561            (*row_number_field).clone(),
5562        ]));
5563
5564        assert_eq!(batch.schema(), schema);
5565        assert_eq!(batch.num_columns(), 2);
5566        assert_eq!(batch.num_rows(), 3);
5567        assert_eq!(
5568            batch
5569                .column(0)
5570                .as_primitive::<types::Int64Type>()
5571                .iter()
5572                .collect::<Vec<_>>(),
5573            vec![Some(1), Some(2), Some(3)]
5574        );
5575        assert_eq!(
5576            batch
5577                .column(1)
5578                .as_primitive::<types::Int64Type>()
5579                .iter()
5580                .collect::<Vec<_>>(),
5581            vec![Some(0), Some(1), Some(2)]
5582        );
5583    }
5584
5585    #[test]
5586    fn test_read_only_row_numbers() {
5587        let file = write_parquet_from_iter(vec![(
5588            "value",
5589            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5590        )]);
5591        let row_number_field = Arc::new(
5592            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5593        );
5594        let options = ArrowReaderOptions::new()
5595            .with_virtual_columns(vec![row_number_field.clone()])
5596            .unwrap();
5597        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5598        let num_columns = metadata
5599            .metadata
5600            .file_metadata()
5601            .schema_descr()
5602            .num_columns();
5603
5604        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5605            .with_projection(ProjectionMask::none(num_columns))
5606            .build()
5607            .expect("reader with schema");
5608
5609        let batch = arrow_reader.next().unwrap().unwrap();
5610        let schema = Arc::new(Schema::new(vec![row_number_field]));
5611
5612        assert_eq!(batch.schema(), schema);
5613        assert_eq!(batch.num_columns(), 1);
5614        assert_eq!(batch.num_rows(), 3);
5615        assert_eq!(
5616            batch
5617                .column(0)
5618                .as_primitive::<types::Int64Type>()
5619                .iter()
5620                .collect::<Vec<_>>(),
5621            vec![Some(0), Some(1), Some(2)]
5622        );
5623    }
5624
5625    #[test]
5626    fn test_read_row_numbers_row_group_order() -> Result<()> {
5627        // Make a parquet file with 100 rows split across 2 row groups
5628        let array = Int64Array::from_iter_values(5000..5100);
5629        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5630        let mut buffer = Vec::new();
5631        let options = WriterProperties::builder()
5632            .set_max_row_group_size(50)
5633            .build();
5634        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5635        // write in 10 row batches as the size limits are enforced after each batch
5636        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5637            writer.write(&batch_chunk)?;
5638        }
5639        writer.close()?;
5640
5641        let row_number_field = Arc::new(
5642            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5643        );
5644
5645        let buffer = Bytes::from(buffer);
5646
5647        let options =
5648            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5649
5650        // read out with normal options
5651        let arrow_reader =
5652            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5653                .build()?;
5654
5655        assert_eq!(
5656            ValuesAndRowNumbers {
5657                values: (5000..5100).collect(),
5658                row_numbers: (0..100).collect()
5659            },
5660            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5661        );
5662
5663        // Now read, out of order row groups
5664        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5665            .with_row_groups(vec![1, 0])
5666            .build()?;
5667
5668        assert_eq!(
5669            ValuesAndRowNumbers {
5670                values: (5050..5100).chain(5000..5050).collect(),
5671                row_numbers: (50..100).chain(0..50).collect(),
5672            },
5673            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5674        );
5675
5676        Ok(())
5677    }
5678
5679    #[derive(Debug, PartialEq)]
5680    struct ValuesAndRowNumbers {
5681        values: Vec<i64>,
5682        row_numbers: Vec<i64>,
5683    }
5684    impl ValuesAndRowNumbers {
5685        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5686            let mut values = vec![];
5687            let mut row_numbers = vec![];
5688            for batch in reader {
5689                let batch = batch.expect("Could not read batch");
5690                values.extend(
5691                    batch
5692                        .column_by_name("col")
5693                        .expect("Could not get col column")
5694                        .as_primitive::<arrow::datatypes::Int64Type>()
5695                        .iter()
5696                        .map(|v| v.expect("Could not get value")),
5697                );
5698
5699                row_numbers.extend(
5700                    batch
5701                        .column_by_name("row_number")
5702                        .expect("Could not get row_number column")
5703                        .as_primitive::<arrow::datatypes::Int64Type>()
5704                        .iter()
5705                        .map(|v| v.expect("Could not get row number"))
5706                        .collect::<Vec<_>>(),
5707                );
5708            }
5709            Self {
5710                values,
5711                row_numbers,
5712            }
5713        }
5714    }
5715
5716    #[test]
5717    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5718        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5719        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5720        assert_eq!(
5721            ArrowReaderOptions::new()
5722                .with_virtual_columns(vec![regular_field])
5723                .unwrap_err()
5724                .to_string(),
5725            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5726        );
5727    }
5728
5729    #[test]
5730    fn test_row_numbers_with_multiple_row_groups() {
5731        test_row_numbers_with_multiple_row_groups_helper(
5732            false,
5733            |path, selection, _row_filter, batch_size| {
5734                let file = File::open(path).unwrap();
5735                let row_number_field = Arc::new(
5736                    Field::new("row_number", ArrowDataType::Int64, false)
5737                        .with_extension_type(RowNumber),
5738                );
5739                let options = ArrowReaderOptions::new()
5740                    .with_virtual_columns(vec![row_number_field])
5741                    .unwrap();
5742                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5743                    .unwrap()
5744                    .with_row_selection(selection)
5745                    .with_batch_size(batch_size)
5746                    .build()
5747                    .expect("Could not create reader");
5748                reader
5749                    .collect::<Result<Vec<_>, _>>()
5750                    .expect("Could not read")
5751            },
5752        );
5753    }
5754
5755    #[test]
5756    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5757        test_row_numbers_with_multiple_row_groups_helper(
5758            true,
5759            |path, selection, row_filter, batch_size| {
5760                let file = File::open(path).unwrap();
5761                let row_number_field = Arc::new(
5762                    Field::new("row_number", ArrowDataType::Int64, false)
5763                        .with_extension_type(RowNumber),
5764                );
5765                let options = ArrowReaderOptions::new()
5766                    .with_virtual_columns(vec![row_number_field])
5767                    .unwrap();
5768                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5769                    .unwrap()
5770                    .with_row_selection(selection)
5771                    .with_batch_size(batch_size)
5772                    .with_row_filter(row_filter.expect("No filter"))
5773                    .build()
5774                    .expect("Could not create reader");
5775                reader
5776                    .collect::<Result<Vec<_>, _>>()
5777                    .expect("Could not read")
5778            },
5779        );
5780    }
5781
5782    #[test]
5783    fn test_read_row_group_indices() {
5784        // create a parquet file with 3 row groups, 2 rows each
5785        let array1 = Int64Array::from(vec![1, 2]);
5786        let array2 = Int64Array::from(vec![3, 4]);
5787        let array3 = Int64Array::from(vec![5, 6]);
5788
5789        let batch1 =
5790            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5791        let batch2 =
5792            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5793        let batch3 =
5794            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5795
5796        let mut buffer = Vec::new();
5797        let options = WriterProperties::builder()
5798            .set_max_row_group_size(2)
5799            .build();
5800        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5801        writer.write(&batch1).unwrap();
5802        writer.write(&batch2).unwrap();
5803        writer.write(&batch3).unwrap();
5804        writer.close().unwrap();
5805
5806        let file = Bytes::from(buffer);
5807        let row_group_index_field = Arc::new(
5808            Field::new("row_group_index", ArrowDataType::Int64, false)
5809                .with_extension_type(RowGroupIndex),
5810        );
5811
5812        let options = ArrowReaderOptions::new()
5813            .with_virtual_columns(vec![row_group_index_field.clone()])
5814            .unwrap();
5815        let mut arrow_reader =
5816            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5817                .expect("reader builder with virtual columns")
5818                .build()
5819                .expect("reader with virtual columns");
5820
5821        let batch = arrow_reader.next().unwrap().unwrap();
5822
5823        assert_eq!(batch.num_columns(), 2);
5824        assert_eq!(batch.num_rows(), 6);
5825
5826        assert_eq!(
5827            batch
5828                .column(0)
5829                .as_primitive::<types::Int64Type>()
5830                .iter()
5831                .collect::<Vec<_>>(),
5832            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5833        );
5834
5835        assert_eq!(
5836            batch
5837                .column(1)
5838                .as_primitive::<types::Int64Type>()
5839                .iter()
5840                .collect::<Vec<_>>(),
5841            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5842        );
5843    }
5844
5845    #[test]
5846    fn test_read_only_row_group_indices() {
5847        let array1 = Int64Array::from(vec![1, 2, 3]);
5848        let array2 = Int64Array::from(vec![4, 5]);
5849
5850        let batch1 =
5851            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5852        let batch2 =
5853            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5854
5855        let mut buffer = Vec::new();
5856        let options = WriterProperties::builder()
5857            .set_max_row_group_size(3)
5858            .build();
5859        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5860        writer.write(&batch1).unwrap();
5861        writer.write(&batch2).unwrap();
5862        writer.close().unwrap();
5863
5864        let file = Bytes::from(buffer);
5865        let row_group_index_field = Arc::new(
5866            Field::new("row_group_index", ArrowDataType::Int64, false)
5867                .with_extension_type(RowGroupIndex),
5868        );
5869
5870        let options = ArrowReaderOptions::new()
5871            .with_virtual_columns(vec![row_group_index_field.clone()])
5872            .unwrap();
5873        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5874        let num_columns = metadata
5875            .metadata
5876            .file_metadata()
5877            .schema_descr()
5878            .num_columns();
5879
5880        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5881            .with_projection(ProjectionMask::none(num_columns))
5882            .build()
5883            .expect("reader with virtual columns only");
5884
5885        let batch = arrow_reader.next().unwrap().unwrap();
5886        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5887
5888        assert_eq!(batch.schema(), schema);
5889        assert_eq!(batch.num_columns(), 1);
5890        assert_eq!(batch.num_rows(), 5);
5891
5892        assert_eq!(
5893            batch
5894                .column(0)
5895                .as_primitive::<types::Int64Type>()
5896                .iter()
5897                .collect::<Vec<_>>(),
5898            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5899        );
5900    }
5901
5902    #[test]
5903    fn test_read_row_group_indices_with_selection() -> Result<()> {
5904        let mut buffer = Vec::new();
5905        let options = WriterProperties::builder()
5906            .set_max_row_group_size(10)
5907            .build();
5908
5909        let schema = Arc::new(Schema::new(vec![Field::new(
5910            "value",
5911            ArrowDataType::Int64,
5912            false,
5913        )]));
5914
5915        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5916
5917        // write out 3 batches of 10 rows each
5918        for i in 0..3 {
5919            let start = i * 10;
5920            let array = Int64Array::from_iter_values(start..start + 10);
5921            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5922            writer.write(&batch)?;
5923        }
5924        writer.close()?;
5925
5926        let file = Bytes::from(buffer);
5927        let row_group_index_field = Arc::new(
5928            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5929        );
5930
5931        let options =
5932            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5933
5934        // test row groups are read in reverse order
5935        let arrow_reader =
5936            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5937                .with_row_groups(vec![2, 1, 0])
5938                .build()?;
5939
5940        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5941        let combined = concat_batches(&batches[0].schema(), &batches)?;
5942
5943        let values = combined.column(0).as_primitive::<types::Int64Type>();
5944        let first_val = values.value(0);
5945        let last_val = values.value(combined.num_rows() - 1);
5946        // first row from rg 2
5947        assert_eq!(first_val, 20);
5948        // the last row from rg 0
5949        assert_eq!(last_val, 9);
5950
5951        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5952        assert_eq!(rg_indices.value(0), 2);
5953        assert_eq!(rg_indices.value(10), 1);
5954        assert_eq!(rg_indices.value(20), 0);
5955
5956        Ok(())
5957    }
5958
5959    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5960        use_filter: bool,
5961        test_case: F,
5962    ) where
5963        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5964    {
5965        let seed: u64 = random();
5966        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5967        let mut rng = StdRng::seed_from_u64(seed);
5968
5969        use tempfile::TempDir;
5970        let tempdir = TempDir::new().expect("Could not create temp dir");
5971
5972        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5973
5974        let path = tempdir.path().join("test.parquet");
5975        std::fs::write(&path, bytes).expect("Could not write file");
5976
5977        let mut case = vec![];
5978        let mut remaining = metadata.file_metadata().num_rows();
5979        while remaining > 0 {
5980            let row_count = rng.random_range(1..=remaining);
5981            remaining -= row_count;
5982            case.push(RowSelector {
5983                row_count: row_count as usize,
5984                skip: rng.random_bool(0.5),
5985            });
5986        }
5987
5988        let filter = use_filter.then(|| {
5989            let filter = (0..metadata.file_metadata().num_rows())
5990                .map(|_| rng.random_bool(0.99))
5991                .collect::<Vec<_>>();
5992            let mut filter_offset = 0;
5993            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5994                ProjectionMask::all(),
5995                move |b| {
5996                    let array = BooleanArray::from_iter(
5997                        filter
5998                            .iter()
5999                            .skip(filter_offset)
6000                            .take(b.num_rows())
6001                            .map(|x| Some(*x)),
6002                    );
6003                    filter_offset += b.num_rows();
6004                    Ok(array)
6005                },
6006            ))])
6007        });
6008
6009        let selection = RowSelection::from(case);
6010        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6011
6012        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6013            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6014            return;
6015        }
6016        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6017            .expect("Failed to concatenate");
6018        // assert_eq!(selection.row_count(), actual.num_rows());
6019        let values = actual
6020            .column(0)
6021            .as_primitive::<types::Int64Type>()
6022            .iter()
6023            .collect::<Vec<_>>();
6024        let row_numbers = actual
6025            .column(1)
6026            .as_primitive::<types::Int64Type>()
6027            .iter()
6028            .collect::<Vec<_>>();
6029        assert_eq!(
6030            row_numbers
6031                .into_iter()
6032                .map(|number| number.map(|number| number + 1))
6033                .collect::<Vec<_>>(),
6034            values
6035        );
6036    }
6037
6038    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6039        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6040            "value",
6041            ArrowDataType::Int64,
6042            false,
6043        )])));
6044
6045        let mut buf = Vec::with_capacity(1024);
6046        let mut writer =
6047            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6048
6049        let mut values = 1..=rng.random_range(1..4096);
6050        while !values.is_empty() {
6051            let batch_values = values
6052                .by_ref()
6053                .take(rng.random_range(1..4096))
6054                .collect::<Vec<_>>();
6055            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6056            let batch =
6057                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6058            writer.write(&batch).expect("Could not write batch");
6059            writer.flush().expect("Could not flush");
6060        }
6061        let metadata = writer.close().expect("Could not close writer");
6062
6063        (Bytes::from(buf), metadata)
6064    }
6065}