Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Default batch size for reading parquet files
61pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63/// Builder for constructing Parquet readers that decode into [Apache Arrow]
64/// arrays.
65///
66/// Most users should use one of the following specializations:
67///
68/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
69/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
70/// * decoder API: [`ParquetPushDecoderBuilder`]
71///
72/// # Features
73/// * Projection pushdown: [`Self::with_projection`]
74/// * Cached metadata: [`ArrowReaderMetadata::load`]
75/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
76/// * Row group filtering: [`Self::with_row_groups`]
77/// * Range filtering: [`Self::with_row_selection`]
78/// * Row level filtering: [`Self::with_row_filter`]
79///
80/// # Implementing Predicate Pushdown
81///
82/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
83/// process, which is efficient and allows the most low level optimizations.
84///
85/// However, most Parquet based systems will apply filters at many steps prior
86/// to decoding such as pruning files, row groups and data pages. This crate
87/// provides the low level APIs needed to implement such filtering, but does not
88/// include any logic to actually evaluate predicates. For example:
89///
90/// * [`Self::with_row_groups`] for Row Group pruning
91/// * [`Self::with_row_selection`] for data page pruning
92/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
93///
94/// The rationale for this design is that implementing predicate pushdown is a
95/// complex topic and varies significantly from system to system. For example
96///
97/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
98/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
99/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
100/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
101///
102/// You can read more about this design in the [Querying Parquet with
103/// Millisecond Latency] Arrow blog post.
104///
105/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
106/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
107/// [Apache Arrow]: https://arrow.apache.org/
108/// [`StatisticsConverter`]: statistics::StatisticsConverter
109/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
110pub struct ArrowReaderBuilder<T> {
111    /// The "input" to read parquet data from.
112    ///
113    /// Note in the case of the [`ParquetPushDecoderBuilder`] there is no
114    /// underlying reader; the input is instead [`PushDecoderInput`], the buffer that
115    /// caller-pushed bytes accumulate in.
116    ///
117    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
118    /// [`PushDecoderInput`]: crate::arrow::push_decoder::PushDecoderInput
119    pub(crate) input: T,
120
121    pub(crate) metadata: Arc<ParquetMetaData>,
122
123    pub(crate) schema: SchemaRef,
124
125    pub(crate) fields: Option<Arc<ParquetField>>,
126
127    pub(crate) batch_size: usize,
128
129    pub(crate) row_groups: Option<Vec<usize>>,
130
131    pub(crate) projection: ProjectionMask,
132
133    pub(crate) filter: Option<RowFilter>,
134
135    pub(crate) selection: Option<RowSelection>,
136
137    pub(crate) row_selection_policy: RowSelectionPolicy,
138
139    pub(crate) limit: Option<usize>,
140
141    pub(crate) offset: Option<usize>,
142
143    pub(crate) metrics: ArrowReaderMetrics,
144
145    pub(crate) max_predicate_cache_size: usize,
146}
147
148impl<T: Debug> Debug for ArrowReaderBuilder<T> {
149    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("ArrowReaderBuilder<T>")
151            .field("input", &self.input)
152            .field("metadata", &self.metadata)
153            .field("schema", &self.schema)
154            .field("fields", &self.fields)
155            .field("batch_size", &self.batch_size)
156            .field("row_groups", &self.row_groups)
157            .field("projection", &self.projection)
158            .field("filter", &self.filter)
159            .field("selection", &self.selection)
160            .field("row_selection_policy", &self.row_selection_policy)
161            .field("limit", &self.limit)
162            .field("offset", &self.offset)
163            .field("metrics", &self.metrics)
164            .finish()
165    }
166}
167
168impl<T> ArrowReaderBuilder<T> {
169    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
170        Self {
171            input,
172            metadata: metadata.metadata,
173            schema: metadata.schema,
174            fields: metadata.fields,
175            batch_size: DEFAULT_BATCH_SIZE,
176            row_groups: None,
177            projection: ProjectionMask::all(),
178            filter: None,
179            selection: None,
180            row_selection_policy: RowSelectionPolicy::default(),
181            limit: None,
182            offset: None,
183            metrics: ArrowReaderMetrics::Disabled,
184            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
185        }
186    }
187
188    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
189    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
190        &self.metadata
191    }
192
193    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
194    pub fn parquet_schema(&self) -> &SchemaDescriptor {
195        self.metadata.file_metadata().schema_descr()
196    }
197
198    /// Returns the arrow [`SchemaRef`] for this parquet file
199    pub fn schema(&self) -> &SchemaRef {
200        &self.schema
201    }
202
203    /// Set the size of [`RecordBatch`] to produce. Defaults to [`DEFAULT_BATCH_SIZE`]
204    /// If the batch_size more than the file row count, use the file row count.
205    pub fn with_batch_size(self, batch_size: usize) -> Self {
206        // Try to avoid allocate large buffer
207        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
208        Self { batch_size, ..self }
209    }
210
211    /// Only read data from the provided row group indexes
212    ///
213    /// This is also called row group filtering
214    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
215        Self {
216            row_groups: Some(row_groups),
217            ..self
218        }
219    }
220
221    /// Only read data from the provided column indexes
222    pub fn with_projection(self, mask: ProjectionMask) -> Self {
223        Self {
224            projection: mask,
225            ..self
226        }
227    }
228
229    /// Configure how row selections should be materialised during execution
230    ///
231    /// See [`RowSelectionPolicy`] for more details
232    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
233        Self {
234            row_selection_policy: policy,
235            ..self
236        }
237    }
238
239    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
240    /// data into memory.
241    ///
242    /// This feature is used to restrict which rows are decoded within row
243    /// groups, skipping ranges of rows that are not needed. Such selections
244    /// could be determined by evaluating predicates against the parquet page
245    /// [`Index`] or some other external information available to a query
246    /// engine.
247    ///
248    /// # Notes
249    ///
250    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
251    /// applying the row selection, and therefore rows from skipped row groups
252    /// should not be included in the [`RowSelection`] (see example below)
253    ///
254    /// It is recommended to enable writing the page index if using this
255    /// functionality, to allow more efficient skipping over data pages. See
256    /// [`ArrowReaderOptions::with_page_index`].
257    ///
258    /// # Example
259    ///
260    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
261    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
262    /// row group 3:
263    ///
264    /// ```text
265    ///   Row Group 0, 1000 rows (selected)
266    ///   Row Group 1, 1000 rows (skipped)
267    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
268    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
269    /// ```
270    ///
271    /// You could pass the following [`RowSelection`]:
272    ///
273    /// ```text
274    ///  Select 1000    (scan all rows in row group 0)
275    ///  Skip 50        (skip the first 50 rows in row group 2)
276    ///  Select 50      (scan rows 50-100 in row group 2)
277    ///  Skip 900       (skip the remaining rows in row group 2)
278    ///  Skip 200       (skip the first 200 rows in row group 3)
279    ///  Select 100     (scan rows 200-300 in row group 3)
280    ///  Skip 700       (skip the remaining rows in row group 3)
281    /// ```
282    /// Note there is no entry for the (entirely) skipped row group 1.
283    ///
284    /// Note you can represent the same selection with fewer entries. Instead of
285    ///
286    /// ```text
287    ///  Skip 900       (skip the remaining rows in row group 2)
288    ///  Skip 200       (skip the first 200 rows in row group 3)
289    /// ```
290    ///
291    /// you could use
292    ///
293    /// ```text
294    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
295    /// ```
296    ///
297    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
298    pub fn with_row_selection(self, selection: RowSelection) -> Self {
299        Self {
300            selection: Some(selection),
301            ..self
302        }
303    }
304
305    /// Provide a [`RowFilter`] to skip decoding rows
306    ///
307    /// Row filters are applied after row group selection and row selection
308    ///
309    /// It is recommended to enable reading the page index if using this functionality, to allow
310    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
311    ///
312    /// See the [blog post on late materialization] for a more technical explanation.
313    ///
314    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
315    ///
316    /// # Example
317    /// ```rust
318    /// # use std::fs::File;
319    /// # use arrow_array::Int32Array;
320    /// # use parquet::arrow::ProjectionMask;
321    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
322    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
323    /// # let testdata = arrow::util::test_util::parquet_test_data();
324    /// # let path = format!("{testdata}/alltypes_plain.parquet");
325    /// # let file = File::open(&path)?;
326    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
327    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
328    /// // Create predicate that evaluates `int_col != 1`.
329    /// // `int_col` column has index 4 (zero based) in the schema
330    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
331    /// // Only the projection columns are passed to the predicate so
332    /// // int_col is column 0 in the predicate
333    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
334    ///     let int_col = batch.column(0);
335    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
336    /// });
337    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
338    /// // The filter will be invoked during the reading process
339    /// let reader = builder.with_row_filter(row_filter).build()?;
340    /// # for b in reader { let _ = b?; }
341    /// # Ok(())
342    /// # }
343    /// ```
344    pub fn with_row_filter(self, filter: RowFilter) -> Self {
345        Self {
346            filter: Some(filter),
347            ..self
348        }
349    }
350
351    /// Provide a limit to the number of rows to be read
352    ///
353    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
354    /// allowing it to limit the final set of rows decoded after any pushed down predicates
355    ///
356    /// It is recommended to enable reading the page index if using this functionality, to allow
357    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
358    pub fn with_limit(self, limit: usize) -> Self {
359        Self {
360            limit: Some(limit),
361            ..self
362        }
363    }
364
365    /// Provide an offset to skip over the given number of rows
366    ///
367    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
368    /// allowing it to skip rows after any pushed down predicates
369    ///
370    /// It is recommended to enable reading the page index if using this functionality, to allow
371    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
372    pub fn with_offset(self, offset: usize) -> Self {
373        Self {
374            offset: Some(offset),
375            ..self
376        }
377    }
378
379    /// Specify metrics collection during reading
380    ///
381    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
382    /// clone of the provided metrics to the builder.
383    ///
384    /// For example:
385    ///
386    /// ```rust
387    /// # use std::sync::Arc;
388    /// # use bytes::Bytes;
389    /// # use arrow_array::{Int32Array, RecordBatch};
390    /// # use arrow_schema::{DataType, Field, Schema};
391    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
392    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
393    /// # use parquet::arrow::ArrowWriter;
394    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
395    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
396    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
397    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
398    /// # writer.write(&batch).unwrap();
399    /// # writer.close().unwrap();
400    /// # let file = Bytes::from(file);
401    /// // Create metrics object to pass into the reader
402    /// let metrics = ArrowReaderMetrics::enabled();
403    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
404    ///   // Configure the builder to use the metrics by passing a clone
405    ///   .with_metrics(metrics.clone())
406    ///   // Build the reader
407    ///   .build().unwrap();
408    /// // .. read data from the reader ..
409    ///
410    /// // check the metrics
411    /// assert!(metrics.records_read_from_inner().is_some());
412    /// ```
413    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
414        Self { metrics, ..self }
415    }
416
417    /// Set the maximum size (per row group) of the predicate cache in bytes for
418    /// the async decoder.
419    ///
420    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
421    /// unlimited cache size.
422    ///
423    /// This cache is used to store decoded arrays that are used in
424    /// predicate evaluation ([`Self::with_row_filter`]).
425    ///
426    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
427    /// [this ticket] for more details and alternatives.
428    ///
429    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
430    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
431    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
432        Self {
433            max_predicate_cache_size,
434            ..self
435        }
436    }
437}
438
439/// Options that control how [`ParquetMetaData`] is read when constructing
440/// an Arrow reader.
441///
442/// To use these options, pass them to one of the following methods:
443/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
444/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
445///
446/// For fine-grained control over metadata loading, use
447/// [`ArrowReaderMetadata::load`] to load metadata with these options,
448///
449/// See [`ArrowReaderBuilder`] for how to configure how the column data
450/// is then read from the file, including projection and filter pushdown
451///
452/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
453#[derive(Debug, Clone, Default)]
454pub struct ArrowReaderOptions {
455    /// Should the reader strip any user defined metadata from the Arrow schema
456    skip_arrow_metadata: bool,
457    /// If provided, used as the schema hint when determining the Arrow schema,
458    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
459    ///
460    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
461    supplied_schema: Option<SchemaRef>,
462
463    pub(crate) column_index: PageIndexPolicy,
464    pub(crate) offset_index: PageIndexPolicy,
465
466    /// Options to control reading of Parquet metadata
467    metadata_options: ParquetMetaDataOptions,
468    /// If encryption is enabled, the file decryption properties can be provided
469    #[cfg(feature = "encryption")]
470    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
471
472    virtual_columns: Vec<FieldRef>,
473}
474
475impl ArrowReaderOptions {
476    /// Create a new [`ArrowReaderOptions`] with the default settings
477    pub fn new() -> Self {
478        Self::default()
479    }
480
481    /// Skip decoding the embedded arrow metadata (defaults to `false`)
482    ///
483    /// Parquet files generated by some writers may contain embedded arrow
484    /// schema and metadata.
485    /// This may not be correct or compatible with your system,
486    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
487    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
488        Self {
489            skip_arrow_metadata,
490            ..self
491        }
492    }
493
494    /// Provide a schema hint to use when reading the Parquet file.
495    ///
496    /// If provided, this schema takes precedence over any arrow schema embedded
497    /// in the metadata (see the [`arrow`] documentation for more details).
498    ///
499    /// If the provided schema is not compatible with the data stored in the
500    /// parquet file schema, an error will be returned when constructing the
501    /// builder.
502    ///
503    /// This option is only required if you want to explicitly control the
504    /// conversion of Parquet types to Arrow types, such as casting a column to
505    /// a different type. For example, if you wanted to read an Int64 in
506    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
507    ///
508    /// [`arrow`]: crate::arrow
509    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
510    ///
511    /// # Notes
512    ///
513    /// The provided schema must have the same number of columns as the parquet schema and
514    /// the column names must be the same.
515    ///
516    /// # Example
517    /// ```
518    /// # use std::sync::Arc;
519    /// # use bytes::Bytes;
520    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
521    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
522    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
523    /// # use parquet::arrow::ArrowWriter;
524    /// // Write data - schema is inferred from the data to be Int32
525    /// let mut file = Vec::new();
526    /// let batch = RecordBatch::try_from_iter(vec![
527    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
528    /// ]).unwrap();
529    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
530    /// writer.write(&batch).unwrap();
531    /// writer.close().unwrap();
532    /// let file = Bytes::from(file);
533    ///
534    /// // Read the file back.
535    /// // Supply a schema that interprets the Int32 column as a Timestamp.
536    /// let supplied_schema = Arc::new(Schema::new(vec![
537    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
538    /// ]));
539    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
540    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
541    ///     file.clone(),
542    ///     options
543    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
544    ///
545    /// // Create the reader and read the data using the supplied schema.
546    /// let mut reader = builder.build().unwrap();
547    /// let _batch = reader.next().unwrap().unwrap();
548    /// ```
549    ///
550    /// # Example: Preserving Dictionary Encoding
551    ///
552    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
553    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
554    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
555    ///
556    /// ```
557    /// # use std::sync::Arc;
558    /// # use bytes::Bytes;
559    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
560    /// # use arrow_schema::{DataType, Field, Schema};
561    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
562    /// # use parquet::arrow::ArrowWriter;
563    /// // Write a Parquet file with string data
564    /// let mut file = Vec::new();
565    /// let schema = Arc::new(Schema::new(vec![
566    ///     Field::new("city", DataType::Utf8, false)
567    /// ]));
568    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
569    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
570    ///
571    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
572    /// writer.write(&batch).unwrap();
573    /// writer.close().unwrap();
574    /// let file = Bytes::from(file);
575    ///
576    /// // Read the file back, requesting dictionary encoding preservation
577    /// let dict_schema = Arc::new(Schema::new(vec![
578    ///     Field::new("city", DataType::Dictionary(
579    ///         Box::new(DataType::Int32),
580    ///         Box::new(DataType::Utf8)
581    ///     ), false)
582    /// ]));
583    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
584    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
585    ///     file.clone(),
586    ///     options
587    /// ).unwrap();
588    ///
589    /// let mut reader = builder.build().unwrap();
590    /// let batch = reader.next().unwrap().unwrap();
591    ///
592    /// // The column is now a DictionaryArray
593    /// assert!(matches!(
594    ///     batch.column(0).data_type(),
595    ///     DataType::Dictionary(_, _)
596    /// ));
597    /// ```
598    ///
599    /// **Note**: Dictionary encoding preservation works best when:
600    /// 1. The original column was dictionary encoded (the default for string columns)
601    /// 2. There are a small number of distinct values
602    pub fn with_schema(self, schema: SchemaRef) -> Self {
603        Self {
604            supplied_schema: Some(schema),
605            skip_arrow_metadata: true,
606            ..self
607        }
608    }
609
610    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
611    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
612    ///
613    /// The `PageIndex` can be used to push down predicates to the parquet scan,
614    /// potentially eliminating unnecessary IO, by some query engines.
615    ///
616    /// If this is enabled, [`ParquetMetaData::column_index`] and
617    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
618    /// information is present in the file.
619    ///
620    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
621    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
622    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
623    pub fn with_page_index(self, page_index: bool) -> Self {
624        self.with_page_index_policy(PageIndexPolicy::from(page_index))
625    }
626
627    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
628    ///
629    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
630    /// This method sets the same policy for both. For fine-grained control, use
631    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
632    ///
633    /// See [`Self::with_page_index`] for more details on page indexes.
634    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
635        self.with_column_index_policy(policy)
636            .with_offset_index_policy(policy)
637    }
638
639    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
640    ///
641    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
642    /// for predicate pushdown and page-level pruning.
643    ///
644    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
645    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
646        self.column_index = policy;
647        self
648    }
649
650    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
651    ///
652    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
653    /// efficient page-level skipping and random access within column chunks.
654    ///
655    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
656    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
657        self.offset_index = policy;
658        self
659    }
660
661    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
662    /// footer will be skipped.
663    ///
664    /// This can be used to avoid reparsing the schema from the file when it is
665    /// already known.
666    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
667        self.metadata_options.set_schema(schema);
668        self
669    }
670
671    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
672    /// (defaults to `false`).
673    ///
674    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
675    /// might be desirable.
676    ///
677    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
678    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
679    /// [`encoding_stats`]:
680    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
681    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
682        self.metadata_options.set_encoding_stats_as_mask(val);
683        self
684    }
685
686    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
687    ///
688    /// [`encoding_stats`]:
689    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
690    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
691        self.metadata_options.set_encoding_stats_policy(policy);
692        self
693    }
694
695    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
696    ///
697    /// [`statistics`]:
698    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
699    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
700        self.metadata_options.set_column_stats_policy(policy);
701        self
702    }
703
704    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
705    ///
706    /// [`size_statistics`]:
707    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
708    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
709        self.metadata_options.set_size_stats_policy(policy);
710        self
711    }
712
713    /// Provide the file decryption properties to use when reading encrypted parquet files.
714    ///
715    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
716    #[cfg(feature = "encryption")]
717    pub fn with_file_decryption_properties(
718        self,
719        file_decryption_properties: Arc<FileDecryptionProperties>,
720    ) -> Self {
721        Self {
722            file_decryption_properties: Some(file_decryption_properties),
723            ..self
724        }
725    }
726
727    /// Include virtual columns in the output.
728    ///
729    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
730    ///
731    /// # Example
732    /// ```
733    /// # use std::sync::Arc;
734    /// # use bytes::Bytes;
735    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
736    /// # use arrow_schema::{DataType, Field, Schema};
737    /// # use parquet::arrow::{ArrowWriter, RowNumber};
738    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
739    /// #
740    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
741    /// // Create a simple record batch with some data
742    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
743    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
744    ///
745    /// // Write the batch to an in-memory buffer
746    /// let mut file = Vec::new();
747    /// let mut writer = ArrowWriter::try_new(
748    ///     &mut file,
749    ///     batch.schema(),
750    ///     None
751    /// )?;
752    /// writer.write(&batch)?;
753    /// writer.close()?;
754    /// let file = Bytes::from(file);
755    ///
756    /// // Create a virtual column for row numbers
757    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
758    ///     .with_extension_type(RowNumber));
759    ///
760    /// // Configure options with virtual columns
761    /// let options = ArrowReaderOptions::new()
762    ///     .with_virtual_columns(vec![row_number_field])?;
763    ///
764    /// // Create a reader with the options
765    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
766    ///     file,
767    ///     options
768    /// )?
769    /// .build()?;
770    ///
771    /// // Read the batch - it will include both the original column and the virtual row_number column
772    /// let result_batch = reader.next().unwrap()?;
773    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
774    /// assert_eq!(result_batch.num_rows(), 3);
775    /// #
776    /// # Ok(())
777    /// # }
778    /// ```
779    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
780        // Validate that all fields are virtual columns
781        for field in &virtual_columns {
782            if !is_virtual_column(field) {
783                return Err(ParquetError::General(format!(
784                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
785                    field.name()
786                )));
787            }
788        }
789        Ok(Self {
790            virtual_columns,
791            ..self
792        })
793    }
794
795    #[deprecated(
796        since = "57.2.0",
797        note = "Use `column_index_policy` or `offset_index_policy` instead"
798    )]
799    /// Returns whether page index reading is enabled.
800    ///
801    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
802    ///
803    /// This can be set via [`with_page_index`][Self::with_page_index] or
804    /// [`with_page_index_policy`][Self::with_page_index_policy].
805    pub fn page_index(&self) -> bool {
806        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
807    }
808
809    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
810    ///
811    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
812    /// or [`with_page_index_policy`][Self::with_page_index_policy].
813    pub fn offset_index_policy(&self) -> PageIndexPolicy {
814        self.offset_index
815    }
816
817    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
818    ///
819    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
820    /// or [`with_page_index_policy`][Self::with_page_index_policy].
821    pub fn column_index_policy(&self) -> PageIndexPolicy {
822        self.column_index
823    }
824
825    /// Retrieve the currently set metadata decoding options.
826    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
827        &self.metadata_options
828    }
829
830    /// Retrieve the currently set file decryption properties.
831    ///
832    /// This can be set via
833    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
834    #[cfg(feature = "encryption")]
835    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
836        self.file_decryption_properties.as_ref()
837    }
838}
839
840/// The metadata necessary to construct a [`ArrowReaderBuilder`]
841///
842/// Note this structure is cheaply clone-able as it consists of several arcs.
843///
844/// This structure allows
845///
846/// 1. Loading metadata for a file once and then using that same metadata to
847///    construct multiple separate readers, for example, to distribute readers
848///    across multiple threads
849///
850/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
851///    from the file each time a reader is constructed.
852///
853/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
854#[derive(Debug, Clone)]
855pub struct ArrowReaderMetadata {
856    /// The Parquet Metadata, if known aprior
857    pub(crate) metadata: Arc<ParquetMetaData>,
858    /// The Arrow Schema
859    pub(crate) schema: SchemaRef,
860    /// The Parquet schema (root field)
861    pub(crate) fields: Option<Arc<ParquetField>>,
862}
863
864impl ArrowReaderMetadata {
865    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
866    /// and [`ChunkReader`]
867    ///
868    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
869    /// example of how this can be used
870    ///
871    /// # Notes
872    ///
873    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
874    /// `Self::metadata` is missing the page index, this function will attempt
875    /// to load the page index by making an object store request.
876    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
877        let metadata = ParquetMetaDataReader::new()
878            .with_column_index_policy(options.column_index)
879            .with_offset_index_policy(options.offset_index)
880            .with_metadata_options(Some(options.metadata_options.clone()));
881        #[cfg(feature = "encryption")]
882        let metadata = metadata.with_decryption_properties(
883            options.file_decryption_properties.as_ref().map(Arc::clone),
884        );
885        let metadata = metadata.parse_and_finish(reader)?;
886        Self::try_new(Arc::new(metadata), options)
887    }
888
889    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
890    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
891    ///
892    /// # Notes
893    ///
894    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
895    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
896    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
897        match options.supplied_schema {
898            Some(supplied_schema) => Self::with_supplied_schema(
899                metadata,
900                supplied_schema.clone(),
901                &options.virtual_columns,
902            ),
903            None => {
904                let kv_metadata = match options.skip_arrow_metadata {
905                    true => None,
906                    false => metadata.file_metadata().key_value_metadata(),
907                };
908
909                let (schema, fields) = parquet_to_arrow_schema_and_fields(
910                    metadata.file_metadata().schema_descr(),
911                    ProjectionMask::all(),
912                    kv_metadata,
913                    &options.virtual_columns,
914                )?;
915
916                Ok(Self {
917                    metadata,
918                    schema: Arc::new(schema),
919                    fields: fields.map(Arc::new),
920                })
921            }
922        }
923    }
924
925    fn with_supplied_schema(
926        metadata: Arc<ParquetMetaData>,
927        supplied_schema: SchemaRef,
928        virtual_columns: &[FieldRef],
929    ) -> Result<Self> {
930        let parquet_schema = metadata.file_metadata().schema_descr();
931        let field_levels = parquet_to_arrow_field_levels_with_virtual(
932            parquet_schema,
933            ProjectionMask::all(),
934            Some(supplied_schema.fields()),
935            virtual_columns,
936        )?;
937        let fields = field_levels.fields;
938        let inferred_len = fields.len();
939        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
940        // Ensure the supplied schema has the same number of columns as the parquet schema.
941        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
942        // different lengths, but we check here to be safe.
943        if inferred_len != supplied_len {
944            return Err(arrow_err!(format!(
945                "Incompatible supplied Arrow schema: expected {} columns received {}",
946                inferred_len, supplied_len
947            )));
948        }
949
950        let mut errors = Vec::new();
951
952        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
953
954        for (field1, field2) in field_iter {
955            if field1.data_type() != field2.data_type() {
956                errors.push(format!(
957                    "data type mismatch for field {}: requested {} but found {}",
958                    field1.name(),
959                    field1.data_type(),
960                    field2.data_type()
961                ));
962            }
963            if field1.is_nullable() != field2.is_nullable() {
964                errors.push(format!(
965                    "nullability mismatch for field {}: expected {:?} but found {:?}",
966                    field1.name(),
967                    field1.is_nullable(),
968                    field2.is_nullable()
969                ));
970            }
971            if field1.metadata() != field2.metadata() {
972                errors.push(format!(
973                    "metadata mismatch for field {}: expected {:?} but found {:?}",
974                    field1.name(),
975                    field1.metadata(),
976                    field2.metadata()
977                ));
978            }
979        }
980
981        if !errors.is_empty() {
982            let message = errors.join(", ");
983            return Err(ParquetError::ArrowError(format!(
984                "Incompatible supplied Arrow schema: {message}",
985            )));
986        }
987
988        Ok(Self {
989            metadata,
990            schema: supplied_schema,
991            fields: field_levels.levels.map(Arc::new),
992        })
993    }
994
995    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
996    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
997        &self.metadata
998    }
999
1000    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
1001    pub fn parquet_schema(&self) -> &SchemaDescriptor {
1002        self.metadata.file_metadata().schema_descr()
1003    }
1004
1005    /// Returns the arrow [`SchemaRef`] for this parquet file
1006    pub fn schema(&self) -> &SchemaRef {
1007        &self.schema
1008    }
1009}
1010
1011#[doc(hidden)]
1012// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1013pub struct SyncReader<T: ChunkReader>(T);
1014
1015impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1016    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1017        f.debug_tuple("SyncReader").field(&self.0).finish()
1018    }
1019}
1020
1021/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1022///
1023/// # See Also
1024/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1025/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1026/// * [`ArrowReaderBuilder`] for additional member functions
1027pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1028
1029impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1030    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1031    ///
1032    /// ```
1033    /// # use std::sync::Arc;
1034    /// # use bytes::Bytes;
1035    /// # use arrow_array::{Int32Array, RecordBatch};
1036    /// # use arrow_schema::{DataType, Field, Schema};
1037    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1038    /// # use parquet::arrow::ArrowWriter;
1039    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1040    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1041    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1042    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1043    /// # writer.write(&batch).unwrap();
1044    /// # writer.close().unwrap();
1045    /// # let file = Bytes::from(file);
1046    /// // Build the reader from anything that implements `ChunkReader`
1047    /// // such as a `File`, or `Bytes`
1048    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1049    /// // The builder has access to ParquetMetaData such
1050    /// // as the number and layout of row groups
1051    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1052    /// // Call build to create the reader
1053    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1054    /// // Read data
1055    /// while let Some(batch) = reader.next().transpose()? {
1056    ///     println!("Read {} rows", batch.num_rows());
1057    /// }
1058    /// # Ok::<(), parquet::errors::ParquetError>(())
1059    /// ```
1060    pub fn try_new(reader: T) -> Result<Self> {
1061        Self::try_new_with_options(reader, Default::default())
1062    }
1063
1064    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1065    ///
1066    /// Use this method if you want to control the options for reading the
1067    /// [`ParquetMetaData`]
1068    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1069        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1070        Ok(Self::new_with_metadata(reader, metadata))
1071    }
1072
1073    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1074    ///
1075    /// Use this method if you already have [`ParquetMetaData`] for a file.
1076    /// This interface allows:
1077    ///
1078    /// 1. Loading metadata once and using it to create multiple builders with
1079    ///    potentially different settings or run on different threads
1080    ///
1081    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1082    ///    file each time a reader is constructed.
1083    ///
1084    /// See the docs on [`ArrowReaderMetadata`] for more details
1085    ///
1086    /// # Example
1087    /// ```
1088    /// # use std::fs::metadata;
1089    /// # use std::sync::Arc;
1090    /// # use bytes::Bytes;
1091    /// # use arrow_array::{Int32Array, RecordBatch};
1092    /// # use arrow_schema::{DataType, Field, Schema};
1093    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1094    /// # use parquet::arrow::ArrowWriter;
1095    /// #
1096    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1097    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1098    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1099    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1100    /// # writer.write(&batch).unwrap();
1101    /// # writer.close().unwrap();
1102    /// # let file = Bytes::from(file);
1103    /// #
1104    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1105    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1106    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1107    ///
1108    /// // Should be able to read from both in parallel
1109    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1110    /// ```
1111    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1112        Self::new_builder(SyncReader(input), metadata)
1113    }
1114
1115    /// Read bloom filter for a column in a row group
1116    ///
1117    /// Returns `None` if the column does not have a bloom filter
1118    ///
1119    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1120    pub fn get_row_group_column_bloom_filter(
1121        &self,
1122        row_group_idx: usize,
1123        column_idx: usize,
1124    ) -> Result<Option<Sbbf>> {
1125        let metadata = self.metadata.row_group(row_group_idx);
1126        let column_metadata = metadata.column(column_idx);
1127
1128        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1129            offset
1130                .try_into()
1131                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1132        } else {
1133            return Ok(None);
1134        };
1135
1136        let buffer = match column_metadata.bloom_filter_length() {
1137            Some(length) => self.input.0.get_bytes(offset, length as usize),
1138            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1139        }?;
1140
1141        let (header, bitset_offset) =
1142            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1143
1144        match header.algorithm {
1145            BloomFilterAlgorithm::BLOCK => {
1146                // this match exists to future proof the singleton algorithm enum
1147            }
1148        }
1149        match header.compression {
1150            BloomFilterCompression::UNCOMPRESSED => {
1151                // this match exists to future proof the singleton compression enum
1152            }
1153        }
1154        match header.hash {
1155            BloomFilterHash::XXHASH => {
1156                // this match exists to future proof the singleton hash enum
1157            }
1158        }
1159
1160        let bitset = match column_metadata.bloom_filter_length() {
1161            Some(_) => buffer.slice(
1162                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1163                    - TryInto::<usize>::try_into(offset).unwrap())..,
1164            ),
1165            None => {
1166                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1167                    ParquetError::General("Bloom filter length is invalid".to_string())
1168                })?;
1169                self.input.0.get_bytes(bitset_offset, bitset_length)?
1170            }
1171        };
1172        Ok(Some(Sbbf::new(&bitset)))
1173    }
1174
1175    /// Build a [`ParquetRecordBatchReader`]
1176    ///
1177    /// Note: this will eagerly evaluate any `RowFilter` before returning
1178    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1179        let Self {
1180            input,
1181            metadata,
1182            schema: _,
1183            fields,
1184            batch_size,
1185            row_groups,
1186            projection,
1187            mut filter,
1188            selection,
1189            row_selection_policy,
1190            limit,
1191            offset,
1192            metrics,
1193            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1194            max_predicate_cache_size: _,
1195        } = self;
1196
1197        // Try to avoid allocate large buffer
1198        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1199
1200        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1201
1202        let reader = ReaderRowGroups {
1203            reader: Arc::new(input.0),
1204            metadata,
1205            row_groups,
1206        };
1207
1208        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1209            .with_selection(selection)
1210            .with_row_selection_policy(row_selection_policy);
1211
1212        // Update selection based on any filters
1213        if let Some(filter) = filter.as_mut() {
1214            for predicate in filter.predicates.iter_mut() {
1215                // break early if we have ruled out all rows
1216                if !plan_builder.selects_any() {
1217                    break;
1218                }
1219
1220                let mut cache_projection = predicate.projection().clone();
1221                cache_projection.intersect(&projection);
1222
1223                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1224                    .with_batch_size(batch_size)
1225                    .with_parquet_metadata(&reader.metadata)
1226                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1227
1228                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1229            }
1230        }
1231
1232        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1233            .with_batch_size(batch_size)
1234            .with_parquet_metadata(&reader.metadata)
1235            .build_array_reader(fields.as_deref(), &projection)?;
1236
1237        let read_plan = plan_builder
1238            .limited(reader.num_rows())
1239            .with_offset(offset)
1240            .with_limit(limit)
1241            .build_limited()
1242            .build();
1243
1244        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1245    }
1246}
1247
1248struct ReaderRowGroups<T: ChunkReader> {
1249    reader: Arc<T>,
1250
1251    metadata: Arc<ParquetMetaData>,
1252    /// Optional list of row group indices to scan
1253    row_groups: Vec<usize>,
1254}
1255
1256impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1257    fn num_rows(&self) -> usize {
1258        let meta = self.metadata.row_groups();
1259        self.row_groups
1260            .iter()
1261            .map(|x| meta[*x].num_rows() as usize)
1262            .sum()
1263    }
1264
1265    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1266        Ok(Box::new(ReaderPageIterator {
1267            column_idx: i,
1268            reader: self.reader.clone(),
1269            metadata: self.metadata.clone(),
1270            row_groups: self.row_groups.clone().into_iter(),
1271        }))
1272    }
1273
1274    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1275        Box::new(
1276            self.row_groups
1277                .iter()
1278                .map(move |i| self.metadata.row_group(*i)),
1279        )
1280    }
1281
1282    fn metadata(&self) -> &ParquetMetaData {
1283        self.metadata.as_ref()
1284    }
1285}
1286
1287struct ReaderPageIterator<T: ChunkReader> {
1288    reader: Arc<T>,
1289    column_idx: usize,
1290    row_groups: std::vec::IntoIter<usize>,
1291    metadata: Arc<ParquetMetaData>,
1292}
1293
1294impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1295    /// Return the next SerializedPageReader
1296    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1297        let rg = self.metadata.row_group(rg_idx);
1298        let column_chunk_metadata = rg.column(self.column_idx);
1299        let offset_index = self.metadata.offset_index();
1300        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1301        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1302        let page_locations = offset_index
1303            .filter(|i| !i[rg_idx].is_empty())
1304            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1305        let total_rows = rg.num_rows() as usize;
1306        let reader = self.reader.clone();
1307
1308        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1309            .add_crypto_context(
1310                rg_idx,
1311                self.column_idx,
1312                self.metadata.as_ref(),
1313                column_chunk_metadata,
1314            )
1315    }
1316}
1317
1318impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1319    type Item = Result<Box<dyn PageReader>>;
1320
1321    fn next(&mut self) -> Option<Self::Item> {
1322        let rg_idx = self.row_groups.next()?;
1323        let page_reader = self
1324            .next_page_reader(rg_idx)
1325            .map(|page_reader| Box::new(page_reader) as _);
1326        Some(page_reader)
1327    }
1328}
1329
1330impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1331
1332/// Reads Parquet data as Arrow [`RecordBatch`]es
1333///
1334/// This struct implements the [`RecordBatchReader`] trait and is an
1335/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1336///
1337/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1338///
1339/// Created by [`ParquetRecordBatchReaderBuilder`]
1340///
1341/// [`Bytes`]: bytes::Bytes
1342pub struct ParquetRecordBatchReader {
1343    array_reader: Box<dyn ArrayReader>,
1344    schema: SchemaRef,
1345    read_plan: ReadPlan,
1346}
1347
1348impl Debug for ParquetRecordBatchReader {
1349    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1350        f.debug_struct("ParquetRecordBatchReader")
1351            .field("array_reader", &"...")
1352            .field("schema", &self.schema)
1353            .field("read_plan", &self.read_plan)
1354            .finish()
1355    }
1356}
1357
1358impl Iterator for ParquetRecordBatchReader {
1359    type Item = Result<RecordBatch, ArrowError>;
1360
1361    fn next(&mut self) -> Option<Self::Item> {
1362        self.next_inner()
1363            .map_err(|arrow_err| arrow_err.into())
1364            .transpose()
1365    }
1366}
1367
1368impl ParquetRecordBatchReader {
1369    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1370    /// has reached the end of the file.
1371    ///
1372    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1373    /// simplify error handling with `?`
1374    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1375        let mut read_records = 0;
1376        let batch_size = self.batch_size();
1377        if batch_size == 0 {
1378            return Ok(None);
1379        }
1380        match self.read_plan.row_selection_cursor_mut() {
1381            RowSelectionCursor::Mask(mask_cursor) => {
1382                // Stream the record batch reader using contiguous segments of the selection
1383                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1384                while !mask_cursor.is_empty() {
1385                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1386                        return Ok(None);
1387                    };
1388
1389                    if mask_chunk.initial_skip > 0 {
1390                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1391                        if skipped != mask_chunk.initial_skip {
1392                            return Err(general_err!(
1393                                "failed to skip rows, expected {}, got {}",
1394                                mask_chunk.initial_skip,
1395                                skipped
1396                            ));
1397                        }
1398                    }
1399
1400                    if mask_chunk.chunk_rows == 0 {
1401                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1402                            return Ok(None);
1403                        }
1404                        continue;
1405                    }
1406
1407                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1408
1409                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1410                    if read == 0 {
1411                        return Err(general_err!(
1412                            "reached end of column while expecting {} rows",
1413                            mask_chunk.chunk_rows
1414                        ));
1415                    }
1416                    if read != mask_chunk.chunk_rows {
1417                        return Err(general_err!(
1418                            "insufficient rows read from array reader - expected {}, got {}",
1419                            mask_chunk.chunk_rows,
1420                            read
1421                        ));
1422                    }
1423
1424                    let array = self.array_reader.consume_batch()?;
1425                    // The column reader exposes the projection as a struct array; convert this
1426                    // into a record batch before applying the boolean filter mask.
1427                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1428                        ArrowError::ParquetError(
1429                            "Struct array reader should return struct array".to_string(),
1430                        )
1431                    })?;
1432
1433                    let filtered_batch =
1434                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1435
1436                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1437                        return Err(general_err!(
1438                            "filtered rows mismatch selection - expected {}, got {}",
1439                            mask_chunk.selected_rows,
1440                            filtered_batch.num_rows()
1441                        ));
1442                    }
1443
1444                    if filtered_batch.num_rows() == 0 {
1445                        continue;
1446                    }
1447
1448                    return Ok(Some(filtered_batch));
1449                }
1450            }
1451            RowSelectionCursor::Selectors(selectors_cursor) => {
1452                while read_records < batch_size && !selectors_cursor.is_empty() {
1453                    let front = selectors_cursor.next_selector();
1454                    if front.skip {
1455                        let skipped = self.array_reader.skip_records(front.row_count)?;
1456
1457                        if skipped != front.row_count {
1458                            return Err(general_err!(
1459                                "failed to skip rows, expected {}, got {}",
1460                                front.row_count,
1461                                skipped
1462                            ));
1463                        }
1464                        continue;
1465                    }
1466
1467                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1468                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1469                    if front.row_count == 0 {
1470                        continue;
1471                    }
1472
1473                    // try to read record
1474                    let need_read = batch_size - read_records;
1475                    let to_read = match front.row_count.checked_sub(need_read) {
1476                        Some(remaining) if remaining != 0 => {
1477                            // if page row count less than batch_size we must set batch size to page row count.
1478                            // add check avoid dead loop
1479                            selectors_cursor.return_selector(RowSelector::select(remaining));
1480                            need_read
1481                        }
1482                        _ => front.row_count,
1483                    };
1484                    match self.array_reader.read_records(to_read)? {
1485                        0 => break,
1486                        rec => read_records += rec,
1487                    };
1488                }
1489            }
1490            RowSelectionCursor::All => {
1491                self.array_reader.read_records(batch_size)?;
1492            }
1493        };
1494
1495        let array = self.array_reader.consume_batch()?;
1496        let struct_array = array.as_struct_opt().ok_or_else(|| {
1497            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1498        })?;
1499
1500        Ok(if struct_array.len() > 0 {
1501            Some(RecordBatch::from(struct_array))
1502        } else {
1503            None
1504        })
1505    }
1506}
1507
1508impl RecordBatchReader for ParquetRecordBatchReader {
1509    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1510    ///
1511    /// Note that the schema metadata will be stripped here. See
1512    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1513    fn schema(&self) -> SchemaRef {
1514        self.schema.clone()
1515    }
1516}
1517
1518impl ParquetRecordBatchReader {
1519    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1520    ///
1521    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1522    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1523        ParquetRecordBatchReaderBuilder::try_new(reader)?
1524            .with_batch_size(batch_size)
1525            .build()
1526    }
1527
1528    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1529    ///
1530    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1531    /// higher-level interface for reading parquet data from a file
1532    pub fn try_new_with_row_groups(
1533        levels: &FieldLevels,
1534        row_groups: &dyn RowGroups,
1535        batch_size: usize,
1536        selection: Option<RowSelection>,
1537    ) -> Result<Self> {
1538        // note metrics are not supported in this API
1539        let metrics = ArrowReaderMetrics::disabled();
1540        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1541            .with_batch_size(batch_size)
1542            .with_parquet_metadata(row_groups.metadata())
1543            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1544
1545        let read_plan = ReadPlanBuilder::new(batch_size)
1546            .with_selection(selection)
1547            .build();
1548
1549        Ok(Self {
1550            array_reader,
1551            schema: Arc::new(Schema::new(levels.fields.clone())),
1552            read_plan,
1553        })
1554    }
1555
1556    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1557    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1558    /// all rows will be returned
1559    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1560        let schema = match array_reader.get_data_type() {
1561            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1562            _ => unreachable!("Struct array reader's data type is not struct!"),
1563        };
1564
1565        Self {
1566            array_reader,
1567            schema: Arc::new(schema),
1568            read_plan,
1569        }
1570    }
1571
1572    #[inline(always)]
1573    pub(crate) fn batch_size(&self) -> usize {
1574        self.read_plan.batch_size()
1575    }
1576}
1577
1578#[cfg(test)]
1579pub(crate) mod tests {
1580    use std::cmp::min;
1581    use std::collections::{HashMap, VecDeque};
1582    use std::fmt::Formatter;
1583    use std::fs::File;
1584    use std::io::Seek;
1585    use std::path::PathBuf;
1586    use std::sync::Arc;
1587
1588    use rand::rngs::StdRng;
1589    use rand::{Rng, RngCore, SeedableRng, random, rng};
1590    use tempfile::tempfile;
1591
1592    use crate::arrow::arrow_reader::{
1593        ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1594        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1595    };
1596    use crate::arrow::schema::{
1597        add_encoded_arrow_schema_to_metadata,
1598        virtual_type::{RowGroupIndex, RowNumber},
1599    };
1600    use crate::arrow::{ArrowWriter, ProjectionMask};
1601    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1602    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1603    use crate::data_type::{
1604        BoolType, ByteArray, ByteArrayType, DataType, DoubleType, FixedLenByteArray,
1605        FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type,
1606    };
1607    use crate::errors::Result;
1608    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1609    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1610    use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1611    use crate::schema::parser::parse_message_type;
1612    use crate::schema::types::{Type, TypePtr};
1613    use crate::util::test_common::rand_gen::RandGen;
1614    use arrow_array::builder::*;
1615    use arrow_array::cast::AsArray;
1616    use arrow_array::types::{
1617        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1618        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1619        Time64MicrosecondType,
1620    };
1621    use arrow_array::*;
1622    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1623    use arrow_data::{ArrayData, ArrayDataBuilder};
1624    use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1625    use arrow_select::concat::concat_batches;
1626    use bytes::Bytes;
1627    use half::f16;
1628    use num_traits::PrimInt;
1629
1630    #[test]
1631    fn test_arrow_reader_all_columns() {
1632        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1633
1634        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1635        let original_schema = Arc::clone(builder.schema());
1636        let reader = builder.build().unwrap();
1637
1638        // Verify that the schema was correctly parsed
1639        assert_eq!(original_schema.fields(), reader.schema().fields());
1640    }
1641
1642    #[test]
1643    fn test_reuse_schema() {
1644        let file = get_test_file("parquet/alltypes-java.parquet");
1645
1646        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1647        let expected = builder.metadata;
1648        let schema = expected.file_metadata().schema_descr_ptr();
1649
1650        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1651        let builder =
1652            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1653
1654        // Verify that the metadata matches
1655        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1656    }
1657
1658    #[test]
1659    fn test_page_encoding_stats_mask() {
1660        let testdata = arrow::util::test_util::parquet_test_data();
1661        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1662        let file = File::open(path).unwrap();
1663
1664        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1665        let builder =
1666            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1667
1668        let row_group_metadata = builder.metadata.row_group(0);
1669
1670        // test page encoding stats
1671        let page_encoding_stats = row_group_metadata
1672            .column(0)
1673            .page_encoding_stats_mask()
1674            .unwrap();
1675        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1676        let page_encoding_stats = row_group_metadata
1677            .column(2)
1678            .page_encoding_stats_mask()
1679            .unwrap();
1680        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1681    }
1682
1683    #[test]
1684    fn test_stats_stats_skipped() {
1685        let testdata = arrow::util::test_util::parquet_test_data();
1686        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1687        let file = File::open(path).unwrap();
1688
1689        // test skipping all
1690        let arrow_options = ArrowReaderOptions::new()
1691            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1692            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1693        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1694            file.try_clone().unwrap(),
1695            arrow_options,
1696        )
1697        .unwrap();
1698
1699        let row_group_metadata = builder.metadata.row_group(0);
1700        for column in row_group_metadata.columns() {
1701            assert!(column.page_encoding_stats().is_none());
1702            assert!(column.page_encoding_stats_mask().is_none());
1703            assert!(column.statistics().is_none());
1704        }
1705
1706        // test skipping all but one column and converting to mask
1707        let arrow_options = ArrowReaderOptions::new()
1708            .with_encoding_stats_as_mask(true)
1709            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1710            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1711        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1712            file.try_clone().unwrap(),
1713            arrow_options,
1714        )
1715        .unwrap();
1716
1717        let row_group_metadata = builder.metadata.row_group(0);
1718        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1719            assert!(column.page_encoding_stats().is_none());
1720            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1721            assert_eq!(column.statistics().is_some(), idx == 0);
1722        }
1723    }
1724
1725    #[test]
1726    fn test_size_stats_stats_skipped() {
1727        let testdata = arrow::util::test_util::parquet_test_data();
1728        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1729        let file = File::open(path).unwrap();
1730
1731        // test skipping all
1732        let arrow_options =
1733            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1734        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1735            file.try_clone().unwrap(),
1736            arrow_options,
1737        )
1738        .unwrap();
1739
1740        let row_group_metadata = builder.metadata.row_group(0);
1741        for column in row_group_metadata.columns() {
1742            assert!(column.repetition_level_histogram().is_none());
1743            assert!(column.definition_level_histogram().is_none());
1744            assert!(column.unencoded_byte_array_data_bytes().is_none());
1745        }
1746
1747        // test skipping all but one column and converting to mask
1748        let arrow_options = ArrowReaderOptions::new()
1749            .with_encoding_stats_as_mask(true)
1750            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1751        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1752            file.try_clone().unwrap(),
1753            arrow_options,
1754        )
1755        .unwrap();
1756
1757        let row_group_metadata = builder.metadata.row_group(0);
1758        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1759            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1760            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1761            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1762        }
1763    }
1764
1765    #[test]
1766    fn test_arrow_reader_single_column() {
1767        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1768
1769        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1770        let original_schema = Arc::clone(builder.schema());
1771
1772        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1773        let reader = builder.with_projection(mask).build().unwrap();
1774
1775        // Verify that the schema was correctly parsed
1776        assert_eq!(1, reader.schema().fields().len());
1777        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1778    }
1779
1780    #[test]
1781    fn test_arrow_reader_single_column_by_name() {
1782        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1783
1784        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1785        let original_schema = Arc::clone(builder.schema());
1786
1787        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1788        let reader = builder.with_projection(mask).build().unwrap();
1789
1790        // Verify that the schema was correctly parsed
1791        assert_eq!(1, reader.schema().fields().len());
1792        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1793    }
1794
1795    #[test]
1796    fn test_null_column_reader_test() {
1797        let mut file = tempfile::tempfile().unwrap();
1798
1799        let schema = "
1800            message message {
1801                OPTIONAL INT32 int32;
1802            }
1803        ";
1804        let schema = Arc::new(parse_message_type(schema).unwrap());
1805
1806        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1807        generate_single_column_file_with_data::<Int32Type>(
1808            &[vec![], vec![]],
1809            Some(&def_levels),
1810            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1811            schema,
1812            Some(Field::new("int32", ArrowDataType::Null, true)),
1813            &Default::default(),
1814        )
1815        .unwrap();
1816
1817        file.rewind().unwrap();
1818
1819        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1820        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1821
1822        assert_eq!(batches.len(), 4);
1823        for batch in &batches[0..3] {
1824            assert_eq!(batch.num_rows(), 2);
1825            assert_eq!(batch.num_columns(), 1);
1826            assert_eq!(batch.column(0).null_count(), 2);
1827        }
1828
1829        assert_eq!(batches[3].num_rows(), 1);
1830        assert_eq!(batches[3].num_columns(), 1);
1831        assert_eq!(batches[3].column(0).null_count(), 1);
1832    }
1833
1834    #[test]
1835    fn test_primitive_single_column_reader_test() {
1836        run_single_column_reader_tests::<BoolType, _, BoolType>(
1837            2,
1838            ConvertedType::NONE,
1839            None,
1840            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1841            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1842        );
1843        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1844            2,
1845            ConvertedType::NONE,
1846            None,
1847            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1848            &[
1849                Encoding::PLAIN,
1850                Encoding::RLE_DICTIONARY,
1851                Encoding::DELTA_BINARY_PACKED,
1852                Encoding::BYTE_STREAM_SPLIT,
1853            ],
1854        );
1855        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1856            2,
1857            ConvertedType::NONE,
1858            None,
1859            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1860            &[
1861                Encoding::PLAIN,
1862                Encoding::RLE_DICTIONARY,
1863                Encoding::DELTA_BINARY_PACKED,
1864                Encoding::BYTE_STREAM_SPLIT,
1865            ],
1866        );
1867        run_single_column_reader_tests::<FloatType, _, FloatType>(
1868            2,
1869            ConvertedType::NONE,
1870            None,
1871            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1872            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1873        );
1874    }
1875
1876    #[test]
1877    fn test_unsigned_primitive_single_column_reader_test() {
1878        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1879            2,
1880            ConvertedType::UINT_32,
1881            Some(ArrowDataType::UInt32),
1882            |vals| {
1883                Arc::new(UInt32Array::from_iter(
1884                    vals.iter().map(|x| x.map(|x| x as u32)),
1885                ))
1886            },
1887            &[
1888                Encoding::PLAIN,
1889                Encoding::RLE_DICTIONARY,
1890                Encoding::DELTA_BINARY_PACKED,
1891            ],
1892        );
1893        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1894            2,
1895            ConvertedType::UINT_64,
1896            Some(ArrowDataType::UInt64),
1897            |vals| {
1898                Arc::new(UInt64Array::from_iter(
1899                    vals.iter().map(|x| x.map(|x| x as u64)),
1900                ))
1901            },
1902            &[
1903                Encoding::PLAIN,
1904                Encoding::RLE_DICTIONARY,
1905                Encoding::DELTA_BINARY_PACKED,
1906            ],
1907        );
1908    }
1909
1910    #[test]
1911    fn test_unsigned_roundtrip() {
1912        let schema = Arc::new(Schema::new(vec![
1913            Field::new("uint32", ArrowDataType::UInt32, true),
1914            Field::new("uint64", ArrowDataType::UInt64, true),
1915        ]));
1916
1917        let mut buf = Vec::with_capacity(1024);
1918        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1919
1920        let original = RecordBatch::try_new(
1921            schema,
1922            vec![
1923                Arc::new(UInt32Array::from_iter_values([
1924                    0,
1925                    i32::MAX as u32,
1926                    u32::MAX,
1927                ])),
1928                Arc::new(UInt64Array::from_iter_values([
1929                    0,
1930                    i64::MAX as u64,
1931                    u64::MAX,
1932                ])),
1933            ],
1934        )
1935        .unwrap();
1936
1937        writer.write(&original).unwrap();
1938        writer.close().unwrap();
1939
1940        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1941        let ret = reader.next().unwrap().unwrap();
1942        assert_eq!(ret, original);
1943
1944        // Check they can be downcast to the correct type
1945        ret.column(0)
1946            .as_any()
1947            .downcast_ref::<UInt32Array>()
1948            .unwrap();
1949
1950        ret.column(1)
1951            .as_any()
1952            .downcast_ref::<UInt64Array>()
1953            .unwrap();
1954    }
1955
1956    #[test]
1957    fn test_float16_roundtrip() -> Result<()> {
1958        let schema = Arc::new(Schema::new(vec![
1959            Field::new("float16", ArrowDataType::Float16, false),
1960            Field::new("float16-nullable", ArrowDataType::Float16, true),
1961        ]));
1962
1963        let mut buf = Vec::with_capacity(1024);
1964        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1965
1966        let original = RecordBatch::try_new(
1967            schema,
1968            vec![
1969                Arc::new(Float16Array::from_iter_values([
1970                    f16::EPSILON,
1971                    f16::MIN,
1972                    f16::MAX,
1973                    f16::NAN,
1974                    f16::INFINITY,
1975                    f16::NEG_INFINITY,
1976                    f16::ONE,
1977                    f16::NEG_ONE,
1978                    f16::ZERO,
1979                    f16::NEG_ZERO,
1980                    f16::E,
1981                    f16::PI,
1982                    f16::FRAC_1_PI,
1983                ])),
1984                Arc::new(Float16Array::from(vec![
1985                    None,
1986                    None,
1987                    None,
1988                    Some(f16::NAN),
1989                    Some(f16::INFINITY),
1990                    Some(f16::NEG_INFINITY),
1991                    None,
1992                    None,
1993                    None,
1994                    None,
1995                    None,
1996                    None,
1997                    Some(f16::FRAC_1_PI),
1998                ])),
1999            ],
2000        )?;
2001
2002        writer.write(&original)?;
2003        writer.close()?;
2004
2005        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2006        let ret = reader.next().unwrap()?;
2007        assert_eq!(ret, original);
2008
2009        // Ensure can be downcast to the correct type
2010        ret.column(0).as_primitive::<Float16Type>();
2011        ret.column(1).as_primitive::<Float16Type>();
2012
2013        Ok(())
2014    }
2015
2016    #[test]
2017    fn test_time_utc_roundtrip() -> Result<()> {
2018        let schema = Arc::new(Schema::new(vec![
2019            Field::new(
2020                "time_millis",
2021                ArrowDataType::Time32(TimeUnit::Millisecond),
2022                true,
2023            )
2024            .with_metadata(HashMap::from_iter(vec![(
2025                "adjusted_to_utc".to_string(),
2026                "".to_string(),
2027            )])),
2028            Field::new(
2029                "time_micros",
2030                ArrowDataType::Time64(TimeUnit::Microsecond),
2031                true,
2032            )
2033            .with_metadata(HashMap::from_iter(vec![(
2034                "adjusted_to_utc".to_string(),
2035                "".to_string(),
2036            )])),
2037        ]));
2038
2039        let mut buf = Vec::with_capacity(1024);
2040        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2041
2042        let original = RecordBatch::try_new(
2043            schema,
2044            vec![
2045                Arc::new(Time32MillisecondArray::from(vec![
2046                    Some(-1),
2047                    Some(0),
2048                    Some(86_399_000),
2049                    Some(86_400_000),
2050                    Some(86_401_000),
2051                    None,
2052                ])),
2053                Arc::new(Time64MicrosecondArray::from(vec![
2054                    Some(-1),
2055                    Some(0),
2056                    Some(86_399 * 1_000_000),
2057                    Some(86_400 * 1_000_000),
2058                    Some(86_401 * 1_000_000),
2059                    None,
2060                ])),
2061            ],
2062        )?;
2063
2064        writer.write(&original)?;
2065        writer.close()?;
2066
2067        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2068        let ret = reader.next().unwrap()?;
2069        assert_eq!(ret, original);
2070
2071        // Ensure can be downcast to the correct type
2072        ret.column(0).as_primitive::<Time32MillisecondType>();
2073        ret.column(1).as_primitive::<Time64MicrosecondType>();
2074
2075        Ok(())
2076    }
2077
2078    #[test]
2079    fn test_date32_roundtrip() -> Result<()> {
2080        use arrow_array::Date32Array;
2081
2082        let schema = Arc::new(Schema::new(vec![Field::new(
2083            "date32",
2084            ArrowDataType::Date32,
2085            false,
2086        )]));
2087
2088        let mut buf = Vec::with_capacity(1024);
2089
2090        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2091
2092        let original = RecordBatch::try_new(
2093            schema,
2094            vec![Arc::new(Date32Array::from(vec![
2095                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2096            ]))],
2097        )?;
2098
2099        writer.write(&original)?;
2100        writer.close()?;
2101
2102        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2103        let ret = reader.next().unwrap()?;
2104        assert_eq!(ret, original);
2105
2106        // Ensure can be downcast to the correct type
2107        ret.column(0).as_primitive::<Date32Type>();
2108
2109        Ok(())
2110    }
2111
2112    #[test]
2113    fn test_date64_roundtrip() -> Result<()> {
2114        use arrow_array::Date64Array;
2115
2116        let schema = Arc::new(Schema::new(vec![
2117            Field::new("small-date64", ArrowDataType::Date64, false),
2118            Field::new("big-date64", ArrowDataType::Date64, false),
2119            Field::new("invalid-date64", ArrowDataType::Date64, false),
2120        ]));
2121
2122        let mut default_buf = Vec::with_capacity(1024);
2123        let mut coerce_buf = Vec::with_capacity(1024);
2124
2125        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2126
2127        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2128        let mut coerce_writer =
2129            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2130
2131        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2132
2133        let original = RecordBatch::try_new(
2134            schema,
2135            vec![
2136                // small-date64
2137                Arc::new(Date64Array::from(vec![
2138                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2139                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2140                    0,
2141                    1_000 * NUM_MILLISECONDS_IN_DAY,
2142                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2143                ])),
2144                // big-date64
2145                Arc::new(Date64Array::from(vec![
2146                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2147                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2148                    0,
2149                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2150                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2151                ])),
2152                // invalid-date64
2153                Arc::new(Date64Array::from(vec![
2154                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2155                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2156                    1,
2157                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2158                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2159                ])),
2160            ],
2161        )?;
2162
2163        default_writer.write(&original)?;
2164        coerce_writer.write(&original)?;
2165
2166        default_writer.close()?;
2167        coerce_writer.close()?;
2168
2169        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2170        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2171
2172        let default_ret = default_reader.next().unwrap()?;
2173        let coerce_ret = coerce_reader.next().unwrap()?;
2174
2175        // Roundtrip should be successful when default writer used
2176        assert_eq!(default_ret, original);
2177
2178        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2179        assert_eq!(coerce_ret.column(0), original.column(0));
2180        assert_ne!(coerce_ret.column(1), original.column(1));
2181        assert_ne!(coerce_ret.column(2), original.column(2));
2182
2183        // Ensure both can be downcast to the correct type
2184        default_ret.column(0).as_primitive::<Date64Type>();
2185        coerce_ret.column(0).as_primitive::<Date64Type>();
2186
2187        Ok(())
2188    }
2189    struct RandFixedLenGen {}
2190
2191    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2192        fn r#gen(len: i32) -> FixedLenByteArray {
2193            let mut v = vec![0u8; len as usize];
2194            rng().fill_bytes(&mut v);
2195            ByteArray::from(v).into()
2196        }
2197    }
2198
2199    #[test]
2200    fn test_fixed_length_binary_column_reader() {
2201        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2202            20,
2203            ConvertedType::NONE,
2204            None,
2205            |vals| {
2206                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2207                for val in vals {
2208                    match val {
2209                        Some(b) => builder.append_value(b).unwrap(),
2210                        None => builder.append_null(),
2211                    }
2212                }
2213                Arc::new(builder.finish())
2214            },
2215            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2216        );
2217    }
2218
2219    #[test]
2220    fn test_interval_day_time_column_reader() {
2221        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2222            12,
2223            ConvertedType::INTERVAL,
2224            None,
2225            |vals| {
2226                Arc::new(
2227                    vals.iter()
2228                        .map(|x| {
2229                            x.as_ref().map(|b| IntervalDayTime {
2230                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2231                                milliseconds: i32::from_le_bytes(
2232                                    b.as_ref()[8..12].try_into().unwrap(),
2233                                ),
2234                            })
2235                        })
2236                        .collect::<IntervalDayTimeArray>(),
2237                )
2238            },
2239            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2240        );
2241    }
2242
2243    #[test]
2244    fn test_int96_single_column_reader_test() {
2245        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2246
2247        type TypeHintAndConversionFunction =
2248            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2249
2250        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2251            // Test without a specified ArrowType hint.
2252            (None, |vals: &[Option<Int96>]| {
2253                Arc::new(TimestampNanosecondArray::from_iter(
2254                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2255                )) as ArrayRef
2256            }),
2257            // Test other TimeUnits as ArrowType hints.
2258            (
2259                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2260                |vals: &[Option<Int96>]| {
2261                    Arc::new(TimestampSecondArray::from_iter(
2262                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2263                    )) as ArrayRef
2264                },
2265            ),
2266            (
2267                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2268                |vals: &[Option<Int96>]| {
2269                    Arc::new(TimestampMillisecondArray::from_iter(
2270                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2271                    )) as ArrayRef
2272                },
2273            ),
2274            (
2275                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2276                |vals: &[Option<Int96>]| {
2277                    Arc::new(TimestampMicrosecondArray::from_iter(
2278                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2279                    )) as ArrayRef
2280                },
2281            ),
2282            (
2283                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2284                |vals: &[Option<Int96>]| {
2285                    Arc::new(TimestampNanosecondArray::from_iter(
2286                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2287                    )) as ArrayRef
2288                },
2289            ),
2290            // Test another timezone with TimeUnit as ArrowType hints.
2291            (
2292                Some(ArrowDataType::Timestamp(
2293                    TimeUnit::Second,
2294                    Some(Arc::from("-05:00")),
2295                )),
2296                |vals: &[Option<Int96>]| {
2297                    Arc::new(
2298                        TimestampSecondArray::from_iter(
2299                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2300                        )
2301                        .with_timezone("-05:00"),
2302                    ) as ArrayRef
2303                },
2304            ),
2305        ];
2306
2307        resolutions.iter().for_each(|(arrow_type, converter)| {
2308            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2309                2,
2310                ConvertedType::NONE,
2311                arrow_type.clone(),
2312                converter,
2313                encodings,
2314            );
2315        })
2316    }
2317
2318    #[test]
2319    fn test_int96_from_spark_file_with_provided_schema() {
2320        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2321        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2322        // microsecond resolution for the Parquet reader to interpret these values correctly.
2323        use arrow_schema::DataType::Timestamp;
2324        let test_data = arrow::util::test_util::parquet_test_data();
2325        let path = format!("{test_data}/int96_from_spark.parquet");
2326        let file = File::open(path).unwrap();
2327
2328        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2329            "a",
2330            Timestamp(TimeUnit::Microsecond, None),
2331            true,
2332        )]));
2333        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2334
2335        let mut record_reader =
2336            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2337                .unwrap()
2338                .build()
2339                .unwrap();
2340
2341        let batch = record_reader.next().unwrap().unwrap();
2342        assert_eq!(batch.num_columns(), 1);
2343        let column = batch.column(0);
2344        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2345
2346        let expected = Arc::new(Int64Array::from(vec![
2347            Some(1704141296123456),
2348            Some(1704070800000000),
2349            Some(253402225200000000),
2350            Some(1735599600000000),
2351            None,
2352            Some(9089380393200000000),
2353        ]));
2354
2355        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2356        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2357        // anyway, so this should be a zero-copy non-modifying cast.
2358
2359        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2360        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2361
2362        assert_eq!(casted_timestamps.len(), expected.len());
2363
2364        casted_timestamps
2365            .iter()
2366            .zip(expected.iter())
2367            .for_each(|(lhs, rhs)| {
2368                assert_eq!(lhs, rhs);
2369            });
2370    }
2371
2372    #[test]
2373    fn test_int96_from_spark_file_without_provided_schema() {
2374        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2375        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2376        // values when read as nanosecond resolution overflow and result in garbage values.
2377        use arrow_schema::DataType::Timestamp;
2378        let test_data = arrow::util::test_util::parquet_test_data();
2379        let path = format!("{test_data}/int96_from_spark.parquet");
2380        let file = File::open(path).unwrap();
2381
2382        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2383            .unwrap()
2384            .build()
2385            .unwrap();
2386
2387        let batch = record_reader.next().unwrap().unwrap();
2388        assert_eq!(batch.num_columns(), 1);
2389        let column = batch.column(0);
2390        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2391
2392        let expected = Arc::new(Int64Array::from(vec![
2393            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2394            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2395            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2396            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2397            None,
2398            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2399        ]));
2400
2401        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2402        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2403        // anyway, so this should be a zero-copy non-modifying cast.
2404
2405        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2406        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2407
2408        assert_eq!(casted_timestamps.len(), expected.len());
2409
2410        casted_timestamps
2411            .iter()
2412            .zip(expected.iter())
2413            .for_each(|(lhs, rhs)| {
2414                assert_eq!(lhs, rhs);
2415            });
2416    }
2417
2418    struct RandUtf8Gen {}
2419
2420    impl RandGen<ByteArrayType> for RandUtf8Gen {
2421        fn r#gen(len: i32) -> ByteArray {
2422            Int32Type::r#gen(len).to_string().as_str().into()
2423        }
2424    }
2425
2426    #[test]
2427    fn test_utf8_single_column_reader_test() {
2428        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2429            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2430                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2431            })))
2432        }
2433
2434        let encodings = &[
2435            Encoding::PLAIN,
2436            Encoding::RLE_DICTIONARY,
2437            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2438            Encoding::DELTA_BYTE_ARRAY,
2439        ];
2440
2441        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2442            2,
2443            ConvertedType::NONE,
2444            None,
2445            |vals| {
2446                Arc::new(BinaryArray::from_iter(
2447                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2448                ))
2449            },
2450            encodings,
2451        );
2452
2453        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2454            2,
2455            ConvertedType::UTF8,
2456            None,
2457            string_converter::<i32>,
2458            encodings,
2459        );
2460
2461        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2462            2,
2463            ConvertedType::UTF8,
2464            Some(ArrowDataType::Utf8),
2465            string_converter::<i32>,
2466            encodings,
2467        );
2468
2469        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2470            2,
2471            ConvertedType::UTF8,
2472            Some(ArrowDataType::LargeUtf8),
2473            string_converter::<i64>,
2474            encodings,
2475        );
2476
2477        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2478        for key in &small_key_types {
2479            for encoding in encodings {
2480                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2481                opts.encoding = *encoding;
2482
2483                let data_type =
2484                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2485
2486                // Cannot run full test suite as keys overflow, run small test instead
2487                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2488                    opts,
2489                    2,
2490                    ConvertedType::UTF8,
2491                    Some(data_type.clone()),
2492                    move |vals| {
2493                        let vals = string_converter::<i32>(vals);
2494                        arrow::compute::cast(&vals, &data_type).unwrap()
2495                    },
2496                );
2497            }
2498        }
2499
2500        let key_types = [
2501            ArrowDataType::Int16,
2502            ArrowDataType::UInt16,
2503            ArrowDataType::Int32,
2504            ArrowDataType::UInt32,
2505            ArrowDataType::Int64,
2506            ArrowDataType::UInt64,
2507        ];
2508
2509        for key in &key_types {
2510            let data_type =
2511                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2512
2513            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2514                2,
2515                ConvertedType::UTF8,
2516                Some(data_type.clone()),
2517                move |vals| {
2518                    let vals = string_converter::<i32>(vals);
2519                    arrow::compute::cast(&vals, &data_type).unwrap()
2520                },
2521                encodings,
2522            );
2523
2524            let data_type = ArrowDataType::Dictionary(
2525                Box::new(key.clone()),
2526                Box::new(ArrowDataType::LargeUtf8),
2527            );
2528
2529            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2530                2,
2531                ConvertedType::UTF8,
2532                Some(data_type.clone()),
2533                move |vals| {
2534                    let vals = string_converter::<i64>(vals);
2535                    arrow::compute::cast(&vals, &data_type).unwrap()
2536                },
2537                encodings,
2538            );
2539        }
2540    }
2541
2542    #[test]
2543    fn test_decimal_nullable_struct() {
2544        let decimals = Decimal256Array::from_iter_values(
2545            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2546        );
2547
2548        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2549            "decimals",
2550            decimals.data_type().clone(),
2551            false,
2552        )])))
2553        .len(8)
2554        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2555        .child_data(vec![decimals.into_data()])
2556        .build()
2557        .unwrap();
2558
2559        let written =
2560            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2561                .unwrap();
2562
2563        let mut buffer = Vec::with_capacity(1024);
2564        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2565        writer.write(&written).unwrap();
2566        writer.close().unwrap();
2567
2568        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2569            .unwrap()
2570            .collect::<Result<Vec<_>, _>>()
2571            .unwrap();
2572
2573        assert_eq!(&written.slice(0, 3), &read[0]);
2574        assert_eq!(&written.slice(3, 3), &read[1]);
2575        assert_eq!(&written.slice(6, 2), &read[2]);
2576    }
2577
2578    #[test]
2579    fn test_int32_nullable_struct() {
2580        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2581        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2582            "int32",
2583            int32.data_type().clone(),
2584            false,
2585        )])))
2586        .len(8)
2587        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2588        .child_data(vec![int32.into_data()])
2589        .build()
2590        .unwrap();
2591
2592        let written =
2593            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2594                .unwrap();
2595
2596        let mut buffer = Vec::with_capacity(1024);
2597        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2598        writer.write(&written).unwrap();
2599        writer.close().unwrap();
2600
2601        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2602            .unwrap()
2603            .collect::<Result<Vec<_>, _>>()
2604            .unwrap();
2605
2606        assert_eq!(&written.slice(0, 3), &read[0]);
2607        assert_eq!(&written.slice(3, 3), &read[1]);
2608        assert_eq!(&written.slice(6, 2), &read[2]);
2609    }
2610
2611    #[test]
2612    fn test_decimal_list() {
2613        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2614
2615        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2616        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2617            decimals.data_type().clone(),
2618            false,
2619        ))))
2620        .len(7)
2621        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2622        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2623        .child_data(vec![decimals.into_data()])
2624        .build()
2625        .unwrap();
2626
2627        let written =
2628            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2629                .unwrap();
2630
2631        let mut buffer = Vec::with_capacity(1024);
2632        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2633        writer.write(&written).unwrap();
2634        writer.close().unwrap();
2635
2636        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2637            .unwrap()
2638            .collect::<Result<Vec<_>, _>>()
2639            .unwrap();
2640
2641        assert_eq!(&written.slice(0, 3), &read[0]);
2642        assert_eq!(&written.slice(3, 3), &read[1]);
2643        assert_eq!(&written.slice(6, 1), &read[2]);
2644    }
2645
2646    #[test]
2647    fn test_read_decimal_file() {
2648        use arrow_array::Decimal128Array;
2649        let testdata = arrow::util::test_util::parquet_test_data();
2650        let file_variants = vec![
2651            ("byte_array", 4),
2652            ("fixed_length", 25),
2653            ("int32", 4),
2654            ("int64", 10),
2655        ];
2656        for (prefix, target_precision) in file_variants {
2657            let path = format!("{testdata}/{prefix}_decimal.parquet");
2658            let file = File::open(path).unwrap();
2659            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2660
2661            let batch = record_reader.next().unwrap().unwrap();
2662            assert_eq!(batch.num_rows(), 24);
2663            let col = batch
2664                .column(0)
2665                .as_any()
2666                .downcast_ref::<Decimal128Array>()
2667                .unwrap();
2668
2669            let expected = 1..25;
2670
2671            assert_eq!(col.precision(), target_precision);
2672            assert_eq!(col.scale(), 2);
2673
2674            for (i, v) in expected.enumerate() {
2675                assert_eq!(col.value(i), v * 100_i128);
2676            }
2677        }
2678    }
2679
2680    #[test]
2681    fn test_read_float16_nonzeros_file() {
2682        use arrow_array::Float16Array;
2683        let testdata = arrow::util::test_util::parquet_test_data();
2684        // see https://github.com/apache/parquet-testing/pull/40
2685        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2686        let file = File::open(path).unwrap();
2687        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2688
2689        let batch = record_reader.next().unwrap().unwrap();
2690        assert_eq!(batch.num_rows(), 8);
2691        let col = batch
2692            .column(0)
2693            .as_any()
2694            .downcast_ref::<Float16Array>()
2695            .unwrap();
2696
2697        let f16_two = f16::ONE + f16::ONE;
2698
2699        assert_eq!(col.null_count(), 1);
2700        assert!(col.is_null(0));
2701        assert_eq!(col.value(1), f16::ONE);
2702        assert_eq!(col.value(2), -f16_two);
2703        assert!(col.value(3).is_nan());
2704        assert_eq!(col.value(4), f16::ZERO);
2705        assert!(col.value(4).is_sign_positive());
2706        assert_eq!(col.value(5), f16::NEG_ONE);
2707        assert_eq!(col.value(6), f16::NEG_ZERO);
2708        assert!(col.value(6).is_sign_negative());
2709        assert_eq!(col.value(7), f16_two);
2710    }
2711
2712    #[test]
2713    fn test_read_float16_zeros_file() {
2714        use arrow_array::Float16Array;
2715        let testdata = arrow::util::test_util::parquet_test_data();
2716        // see https://github.com/apache/parquet-testing/pull/40
2717        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2718        let file = File::open(path).unwrap();
2719        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2720
2721        let batch = record_reader.next().unwrap().unwrap();
2722        assert_eq!(batch.num_rows(), 3);
2723        let col = batch
2724            .column(0)
2725            .as_any()
2726            .downcast_ref::<Float16Array>()
2727            .unwrap();
2728
2729        assert_eq!(col.null_count(), 1);
2730        assert!(col.is_null(0));
2731        assert_eq!(col.value(1), f16::ZERO);
2732        assert!(col.value(1).is_sign_positive());
2733        assert!(col.value(2).is_nan());
2734    }
2735
2736    #[test]
2737    fn test_read_float32_float64_byte_stream_split() {
2738        let path = format!(
2739            "{}/byte_stream_split.zstd.parquet",
2740            arrow::util::test_util::parquet_test_data(),
2741        );
2742        let file = File::open(path).unwrap();
2743        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2744
2745        let mut row_count = 0;
2746        for batch in record_reader {
2747            let batch = batch.unwrap();
2748            row_count += batch.num_rows();
2749            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2750            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2751
2752            // This file contains floats from a standard normal distribution
2753            for &x in f32_col.values() {
2754                assert!(x > -10.0);
2755                assert!(x < 10.0);
2756            }
2757            for &x in f64_col.values() {
2758                assert!(x > -10.0);
2759                assert!(x < 10.0);
2760            }
2761        }
2762        assert_eq!(row_count, 300);
2763    }
2764
2765    #[test]
2766    fn test_read_extended_byte_stream_split() {
2767        let path = format!(
2768            "{}/byte_stream_split_extended.gzip.parquet",
2769            arrow::util::test_util::parquet_test_data(),
2770        );
2771        let file = File::open(path).unwrap();
2772        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2773
2774        let mut row_count = 0;
2775        for batch in record_reader {
2776            let batch = batch.unwrap();
2777            row_count += batch.num_rows();
2778
2779            // 0,1 are f16
2780            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2781            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2782            assert_eq!(f16_col.len(), f16_bss.len());
2783            f16_col
2784                .iter()
2785                .zip(f16_bss.iter())
2786                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2787
2788            // 2,3 are f32
2789            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2790            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2791            assert_eq!(f32_col.len(), f32_bss.len());
2792            f32_col
2793                .iter()
2794                .zip(f32_bss.iter())
2795                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2796
2797            // 4,5 are f64
2798            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2799            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2800            assert_eq!(f64_col.len(), f64_bss.len());
2801            f64_col
2802                .iter()
2803                .zip(f64_bss.iter())
2804                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2805
2806            // 6,7 are i32
2807            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2808            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2809            assert_eq!(i32_col.len(), i32_bss.len());
2810            i32_col
2811                .iter()
2812                .zip(i32_bss.iter())
2813                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2814
2815            // 8,9 are i64
2816            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2817            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2818            assert_eq!(i64_col.len(), i64_bss.len());
2819            i64_col
2820                .iter()
2821                .zip(i64_bss.iter())
2822                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2823
2824            // 10,11 are FLBA(5)
2825            let flba_col = batch.column(10).as_fixed_size_binary();
2826            let flba_bss = batch.column(11).as_fixed_size_binary();
2827            assert_eq!(flba_col.len(), flba_bss.len());
2828            flba_col
2829                .iter()
2830                .zip(flba_bss.iter())
2831                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2832
2833            // 12,13 are FLBA(4) (decimal(7,3))
2834            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2835            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2836            assert_eq!(dec_col.len(), dec_bss.len());
2837            dec_col
2838                .iter()
2839                .zip(dec_bss.iter())
2840                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2841        }
2842        assert_eq!(row_count, 200);
2843    }
2844
2845    #[test]
2846    fn test_read_incorrect_map_schema_file() {
2847        let testdata = arrow::util::test_util::parquet_test_data();
2848        // see https://github.com/apache/parquet-testing/pull/47
2849        let path = format!("{testdata}/incorrect_map_schema.parquet");
2850        let file = File::open(path).unwrap();
2851        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2852
2853        let batch = record_reader.next().unwrap().unwrap();
2854        assert_eq!(batch.num_rows(), 1);
2855
2856        let expected_schema = Schema::new(vec![Field::new(
2857            "my_map",
2858            ArrowDataType::Map(
2859                Arc::new(Field::new(
2860                    "key_value",
2861                    ArrowDataType::Struct(Fields::from(vec![
2862                        Field::new("key", ArrowDataType::Utf8, false),
2863                        Field::new("value", ArrowDataType::Utf8, true),
2864                    ])),
2865                    false,
2866                )),
2867                false,
2868            ),
2869            true,
2870        )]);
2871        assert_eq!(batch.schema().as_ref(), &expected_schema);
2872
2873        assert_eq!(batch.num_rows(), 1);
2874        assert_eq!(batch.column(0).null_count(), 0);
2875        assert_eq!(
2876            batch.column(0).as_map().keys().as_ref(),
2877            &StringArray::from(vec!["parent", "name"])
2878        );
2879        assert_eq!(
2880            batch.column(0).as_map().values().as_ref(),
2881            &StringArray::from(vec!["another", "report"])
2882        );
2883    }
2884
2885    #[test]
2886    fn test_read_dict_fixed_size_binary() {
2887        let schema = Arc::new(Schema::new(vec![Field::new(
2888            "a",
2889            ArrowDataType::Dictionary(
2890                Box::new(ArrowDataType::UInt8),
2891                Box::new(ArrowDataType::FixedSizeBinary(8)),
2892            ),
2893            true,
2894        )]));
2895        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2896        let values = FixedSizeBinaryArray::try_from_iter(
2897            vec![
2898                (0u8..8u8).collect::<Vec<u8>>(),
2899                (24u8..32u8).collect::<Vec<u8>>(),
2900            ]
2901            .into_iter(),
2902        )
2903        .unwrap();
2904        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2905        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2906
2907        let mut buffer = Vec::with_capacity(1024);
2908        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2909        writer.write(&batch).unwrap();
2910        writer.close().unwrap();
2911        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2912            .unwrap()
2913            .collect::<Result<Vec<_>, _>>()
2914            .unwrap();
2915
2916        assert_eq!(read.len(), 1);
2917        assert_eq!(&batch, &read[0])
2918    }
2919
2920    #[test]
2921    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2922        // the `StructArrayReader` will check the definition and repetition levels of the first
2923        // child column in the struct to determine nullability for the struct. If the first
2924        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2925        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2926        // buffers managed by this array reader.
2927
2928        let struct_fields = Fields::from(vec![
2929            Field::new(
2930                "city",
2931                ArrowDataType::Dictionary(
2932                    Box::new(ArrowDataType::UInt8),
2933                    Box::new(ArrowDataType::Utf8),
2934                ),
2935                true,
2936            ),
2937            Field::new("name", ArrowDataType::Utf8, true),
2938        ]);
2939        let schema = Arc::new(Schema::new(vec![Field::new(
2940            "items",
2941            ArrowDataType::Struct(struct_fields.clone()),
2942            true,
2943        )]));
2944
2945        let items_arr = StructArray::new(
2946            struct_fields,
2947            vec![
2948                Arc::new(DictionaryArray::new(
2949                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2950                    Arc::new(StringArray::from_iter_values(vec![
2951                        "quebec",
2952                        "fredericton",
2953                        "halifax",
2954                    ])),
2955                )),
2956                Arc::new(StringArray::from_iter_values(vec![
2957                    "albert", "terry", "lance", "", "tim",
2958                ])),
2959            ],
2960            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2961        );
2962
2963        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2964        let mut buffer = Vec::with_capacity(1024);
2965        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2966        writer.write(&batch).unwrap();
2967        writer.close().unwrap();
2968        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2969            .unwrap()
2970            .collect::<Result<Vec<_>, _>>()
2971            .unwrap();
2972
2973        assert_eq!(read.len(), 1);
2974        assert_eq!(&batch, &read[0])
2975    }
2976
2977    /// Parameters for single_column_reader_test
2978    #[derive(Clone)]
2979    struct TestOptions {
2980        /// Number of row group to write to parquet (row group size =
2981        /// num_row_groups / num_rows)
2982        num_row_groups: usize,
2983        /// Total number of rows per row group
2984        num_rows: usize,
2985        /// Size of batches to read back
2986        record_batch_size: usize,
2987        /// Percentage of nulls in column or None if required
2988        null_percent: Option<usize>,
2989        /// Set write batch size
2990        ///
2991        /// This is the number of rows that are written at once to a page and
2992        /// therefore acts as a bound on the page granularity of a row group
2993        write_batch_size: usize,
2994        /// Maximum size of page in bytes
2995        max_data_page_size: usize,
2996        /// Maximum size of dictionary page in bytes
2997        max_dict_page_size: usize,
2998        /// Writer version
2999        writer_version: WriterVersion,
3000        /// Enabled statistics
3001        enabled_statistics: EnabledStatistics,
3002        /// Encoding
3003        encoding: Encoding,
3004        /// row selections and total selected row count
3005        row_selections: Option<(RowSelection, usize)>,
3006        /// row filter
3007        row_filter: Option<Vec<bool>>,
3008        /// limit
3009        limit: Option<usize>,
3010        /// offset
3011        offset: Option<usize>,
3012    }
3013
3014    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3015    impl std::fmt::Debug for TestOptions {
3016        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3017            f.debug_struct("TestOptions")
3018                .field("num_row_groups", &self.num_row_groups)
3019                .field("num_rows", &self.num_rows)
3020                .field("record_batch_size", &self.record_batch_size)
3021                .field("null_percent", &self.null_percent)
3022                .field("write_batch_size", &self.write_batch_size)
3023                .field("max_data_page_size", &self.max_data_page_size)
3024                .field("max_dict_page_size", &self.max_dict_page_size)
3025                .field("writer_version", &self.writer_version)
3026                .field("enabled_statistics", &self.enabled_statistics)
3027                .field("encoding", &self.encoding)
3028                .field("row_selections", &self.row_selections.is_some())
3029                .field("row_filter", &self.row_filter.is_some())
3030                .field("limit", &self.limit)
3031                .field("offset", &self.offset)
3032                .finish()
3033        }
3034    }
3035
3036    impl Default for TestOptions {
3037        fn default() -> Self {
3038            Self {
3039                num_row_groups: 2,
3040                num_rows: 100,
3041                record_batch_size: 15,
3042                null_percent: None,
3043                write_batch_size: 64,
3044                max_data_page_size: 1024 * 1024,
3045                max_dict_page_size: 1024 * 1024,
3046                writer_version: WriterVersion::PARQUET_1_0,
3047                enabled_statistics: EnabledStatistics::Page,
3048                encoding: Encoding::PLAIN,
3049                row_selections: None,
3050                row_filter: None,
3051                limit: None,
3052                offset: None,
3053            }
3054        }
3055    }
3056
3057    impl TestOptions {
3058        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3059            Self {
3060                num_row_groups,
3061                num_rows,
3062                record_batch_size,
3063                ..Default::default()
3064            }
3065        }
3066
3067        fn with_null_percent(self, null_percent: usize) -> Self {
3068            Self {
3069                null_percent: Some(null_percent),
3070                ..self
3071            }
3072        }
3073
3074        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3075            Self {
3076                max_data_page_size,
3077                ..self
3078            }
3079        }
3080
3081        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3082            Self {
3083                max_dict_page_size,
3084                ..self
3085            }
3086        }
3087
3088        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3089            Self {
3090                enabled_statistics,
3091                ..self
3092            }
3093        }
3094
3095        fn with_row_selections(self) -> Self {
3096            assert!(self.row_filter.is_none(), "Must set row selection first");
3097
3098            let mut rng = rng();
3099            let step = rng.random_range(self.record_batch_size..self.num_rows);
3100            let row_selections = create_test_selection(
3101                step,
3102                self.num_row_groups * self.num_rows,
3103                rng.random::<bool>(),
3104            );
3105            Self {
3106                row_selections: Some(row_selections),
3107                ..self
3108            }
3109        }
3110
3111        fn with_row_filter(self) -> Self {
3112            let row_count = match &self.row_selections {
3113                Some((_, count)) => *count,
3114                None => self.num_row_groups * self.num_rows,
3115            };
3116
3117            let mut rng = rng();
3118            Self {
3119                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3120                ..self
3121            }
3122        }
3123
3124        fn with_limit(self, limit: usize) -> Self {
3125            Self {
3126                limit: Some(limit),
3127                ..self
3128            }
3129        }
3130
3131        fn with_offset(self, offset: usize) -> Self {
3132            Self {
3133                offset: Some(offset),
3134                ..self
3135            }
3136        }
3137
3138        fn writer_props(&self) -> WriterProperties {
3139            let builder = WriterProperties::builder()
3140                .set_data_page_size_limit(self.max_data_page_size)
3141                .set_write_batch_size(self.write_batch_size)
3142                .set_writer_version(self.writer_version)
3143                .set_statistics_enabled(self.enabled_statistics);
3144
3145            let builder = match self.encoding {
3146                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3147                    .set_dictionary_enabled(true)
3148                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3149                _ => builder
3150                    .set_dictionary_enabled(false)
3151                    .set_encoding(self.encoding),
3152            };
3153
3154            builder.build()
3155        }
3156    }
3157
3158    /// Create a parquet file and then read it using
3159    /// `ParquetFileArrowReader` using a standard set of parameters
3160    /// `opts`.
3161    ///
3162    /// `rand_max` represents the maximum size of value to pass to to
3163    /// value generator
3164    fn run_single_column_reader_tests<T, F, G>(
3165        rand_max: i32,
3166        converted_type: ConvertedType,
3167        arrow_type: Option<ArrowDataType>,
3168        converter: F,
3169        encodings: &[Encoding],
3170    ) where
3171        T: DataType,
3172        G: RandGen<T>,
3173        F: Fn(&[Option<T::T>]) -> ArrayRef,
3174    {
3175        let all_options = vec![
3176            // choose record_batch_batch (15) so batches cross row
3177            // group boundaries (50 rows in 2 row groups) cases.
3178            TestOptions::new(2, 100, 15),
3179            // choose record_batch_batch (5) so batches sometime fall
3180            // on row group boundaries and (25 rows in 3 row groups
3181            // --> row groups of 10, 10, and 5). Tests buffer
3182            // refilling edge cases.
3183            TestOptions::new(3, 25, 5),
3184            // Choose record_batch_size (25) so all batches fall
3185            // exactly on row group boundary (25). Tests buffer
3186            // refilling edge cases.
3187            TestOptions::new(4, 100, 25),
3188            // Set maximum page size so row groups have multiple pages
3189            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3190            // Set small dictionary page size to test dictionary fallback
3191            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3192            // Test optional but with no nulls
3193            TestOptions::new(2, 256, 127).with_null_percent(0),
3194            // Test optional with nulls
3195            TestOptions::new(2, 256, 93).with_null_percent(25),
3196            // Test with limit of 0
3197            TestOptions::new(4, 100, 25).with_limit(0),
3198            // Test with limit of 50
3199            TestOptions::new(4, 100, 25).with_limit(50),
3200            // Test with limit equal to number of rows
3201            TestOptions::new(4, 100, 25).with_limit(10),
3202            // Test with limit larger than number of rows
3203            TestOptions::new(4, 100, 25).with_limit(101),
3204            // Test with limit + offset equal to number of rows
3205            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3206            // Test with limit + offset equal to number of rows
3207            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3208            // Test with limit + offset larger than number of rows
3209            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3210            // Test with no page-level statistics
3211            TestOptions::new(2, 256, 91)
3212                .with_null_percent(25)
3213                .with_enabled_statistics(EnabledStatistics::Chunk),
3214            // Test with no statistics
3215            TestOptions::new(2, 256, 91)
3216                .with_null_percent(25)
3217                .with_enabled_statistics(EnabledStatistics::None),
3218            // Test with all null
3219            TestOptions::new(2, 128, 91)
3220                .with_null_percent(100)
3221                .with_enabled_statistics(EnabledStatistics::None),
3222            // Test skip
3223
3224            // choose record_batch_batch (15) so batches cross row
3225            // group boundaries (50 rows in 2 row groups) cases.
3226            TestOptions::new(2, 100, 15).with_row_selections(),
3227            // choose record_batch_batch (5) so batches sometime fall
3228            // on row group boundaries and (25 rows in 3 row groups
3229            // --> row groups of 10, 10, and 5). Tests buffer
3230            // refilling edge cases.
3231            TestOptions::new(3, 25, 5).with_row_selections(),
3232            // Choose record_batch_size (25) so all batches fall
3233            // exactly on row group boundary (25). Tests buffer
3234            // refilling edge cases.
3235            TestOptions::new(4, 100, 25).with_row_selections(),
3236            // Set maximum page size so row groups have multiple pages
3237            TestOptions::new(3, 256, 73)
3238                .with_max_data_page_size(128)
3239                .with_row_selections(),
3240            // Set small dictionary page size to test dictionary fallback
3241            TestOptions::new(3, 256, 57)
3242                .with_max_dict_page_size(128)
3243                .with_row_selections(),
3244            // Test optional but with no nulls
3245            TestOptions::new(2, 256, 127)
3246                .with_null_percent(0)
3247                .with_row_selections(),
3248            // Test optional with nulls
3249            TestOptions::new(2, 256, 93)
3250                .with_null_percent(25)
3251                .with_row_selections(),
3252            // Test optional with nulls
3253            TestOptions::new(2, 256, 93)
3254                .with_null_percent(25)
3255                .with_row_selections()
3256                .with_limit(10),
3257            // Test optional with nulls
3258            TestOptions::new(2, 256, 93)
3259                .with_null_percent(25)
3260                .with_row_selections()
3261                .with_offset(20)
3262                .with_limit(10),
3263            // Test filter
3264
3265            // Test with row filter
3266            TestOptions::new(4, 100, 25).with_row_filter(),
3267            // Test with row selection and row filter
3268            TestOptions::new(4, 100, 25)
3269                .with_row_selections()
3270                .with_row_filter(),
3271            // Test with nulls and row filter
3272            TestOptions::new(2, 256, 93)
3273                .with_null_percent(25)
3274                .with_max_data_page_size(10)
3275                .with_row_filter(),
3276            // Test with nulls and row filter and small pages
3277            TestOptions::new(2, 256, 93)
3278                .with_null_percent(25)
3279                .with_max_data_page_size(10)
3280                .with_row_selections()
3281                .with_row_filter(),
3282            // Test with row selection and no offset index and small pages
3283            TestOptions::new(2, 256, 93)
3284                .with_enabled_statistics(EnabledStatistics::None)
3285                .with_max_data_page_size(10)
3286                .with_row_selections(),
3287        ];
3288
3289        all_options.into_iter().for_each(|opts| {
3290            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3291                for encoding in encodings {
3292                    let opts = TestOptions {
3293                        writer_version,
3294                        encoding: *encoding,
3295                        ..opts.clone()
3296                    };
3297
3298                    single_column_reader_test::<T, _, G>(
3299                        opts,
3300                        rand_max,
3301                        converted_type,
3302                        arrow_type.clone(),
3303                        &converter,
3304                    )
3305                }
3306            }
3307        });
3308    }
3309
3310    /// Create a parquet file and then read it using
3311    /// `ParquetFileArrowReader` using the parameters described in
3312    /// `opts`.
3313    fn single_column_reader_test<T, F, G>(
3314        opts: TestOptions,
3315        rand_max: i32,
3316        converted_type: ConvertedType,
3317        arrow_type: Option<ArrowDataType>,
3318        converter: F,
3319    ) where
3320        T: DataType,
3321        G: RandGen<T>,
3322        F: Fn(&[Option<T::T>]) -> ArrayRef,
3323    {
3324        // Print out options to facilitate debugging failures on CI
3325        println!(
3326            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3327            T::get_physical_type(),
3328            converted_type,
3329            arrow_type,
3330            opts
3331        );
3332
3333        //according to null_percent generate def_levels
3334        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3335            Some(null_percent) => {
3336                let mut rng = rng();
3337
3338                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3339                    .map(|_| {
3340                        std::iter::from_fn(|| {
3341                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3342                        })
3343                        .take(opts.num_rows)
3344                        .collect()
3345                    })
3346                    .collect();
3347                (Repetition::OPTIONAL, Some(def_levels))
3348            }
3349            None => (Repetition::REQUIRED, None),
3350        };
3351
3352        //generate random table data
3353        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3354            .map(|idx| {
3355                let null_count = match def_levels.as_ref() {
3356                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3357                    None => 0,
3358                };
3359                G::gen_vec(rand_max, opts.num_rows - null_count)
3360            })
3361            .collect();
3362
3363        let len = match T::get_physical_type() {
3364            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3365            crate::basic::Type::INT96 => 12,
3366            _ => -1,
3367        };
3368
3369        let fields = vec![Arc::new(
3370            Type::primitive_type_builder("leaf", T::get_physical_type())
3371                .with_repetition(repetition)
3372                .with_converted_type(converted_type)
3373                .with_length(len)
3374                .build()
3375                .unwrap(),
3376        )];
3377
3378        let schema = Arc::new(
3379            Type::group_type_builder("test_schema")
3380                .with_fields(fields)
3381                .build()
3382                .unwrap(),
3383        );
3384
3385        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3386
3387        let mut file = tempfile::tempfile().unwrap();
3388
3389        generate_single_column_file_with_data::<T>(
3390            &values,
3391            def_levels.as_ref(),
3392            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3393            schema,
3394            arrow_field,
3395            &opts,
3396        )
3397        .unwrap();
3398
3399        file.rewind().unwrap();
3400
3401        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3402            opts.enabled_statistics == EnabledStatistics::Page,
3403        ));
3404
3405        let mut builder =
3406            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3407
3408        let expected_data = match opts.row_selections {
3409            Some((selections, row_count)) => {
3410                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3411
3412                let mut skip_data: Vec<Option<T::T>> = vec![];
3413                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3414                for select in dequeue {
3415                    if select.skip {
3416                        without_skip_data.drain(0..select.row_count);
3417                    } else {
3418                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3419                    }
3420                }
3421                builder = builder.with_row_selection(selections);
3422
3423                assert_eq!(skip_data.len(), row_count);
3424                skip_data
3425            }
3426            None => {
3427                //get flatten table data
3428                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3429                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3430                expected_data
3431            }
3432        };
3433
3434        let mut expected_data = match opts.row_filter {
3435            Some(filter) => {
3436                let expected_data = expected_data
3437                    .into_iter()
3438                    .zip(filter.iter())
3439                    .filter_map(|(d, f)| f.then(|| d))
3440                    .collect();
3441
3442                let mut filter_offset = 0;
3443                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3444                    ProjectionMask::all(),
3445                    move |b| {
3446                        let array = BooleanArray::from_iter(
3447                            filter
3448                                .iter()
3449                                .skip(filter_offset)
3450                                .take(b.num_rows())
3451                                .map(|x| Some(*x)),
3452                        );
3453                        filter_offset += b.num_rows();
3454                        Ok(array)
3455                    },
3456                ))]);
3457
3458                builder = builder.with_row_filter(filter);
3459                expected_data
3460            }
3461            None => expected_data,
3462        };
3463
3464        if let Some(offset) = opts.offset {
3465            builder = builder.with_offset(offset);
3466            expected_data = expected_data.into_iter().skip(offset).collect();
3467        }
3468
3469        if let Some(limit) = opts.limit {
3470            builder = builder.with_limit(limit);
3471            expected_data = expected_data.into_iter().take(limit).collect();
3472        }
3473
3474        let mut record_reader = builder
3475            .with_batch_size(opts.record_batch_size)
3476            .build()
3477            .unwrap();
3478
3479        let mut total_read = 0;
3480        loop {
3481            let maybe_batch = record_reader.next();
3482            if total_read < expected_data.len() {
3483                let end = min(total_read + opts.record_batch_size, expected_data.len());
3484                let batch = maybe_batch.unwrap().unwrap();
3485                assert_eq!(end - total_read, batch.num_rows());
3486
3487                let a = converter(&expected_data[total_read..end]);
3488                let b = batch.column(0);
3489
3490                assert_eq!(a.data_type(), b.data_type());
3491                assert_eq!(a.to_data(), b.to_data());
3492                assert_eq!(
3493                    a.as_any().type_id(),
3494                    b.as_any().type_id(),
3495                    "incorrect type ids"
3496                );
3497
3498                total_read = end;
3499            } else {
3500                assert!(maybe_batch.is_none());
3501                break;
3502            }
3503        }
3504    }
3505
3506    fn gen_expected_data<T: DataType>(
3507        def_levels: Option<&Vec<Vec<i16>>>,
3508        values: &[Vec<T::T>],
3509    ) -> Vec<Option<T::T>> {
3510        let data: Vec<Option<T::T>> = match def_levels {
3511            Some(levels) => {
3512                let mut values_iter = values.iter().flatten();
3513                levels
3514                    .iter()
3515                    .flatten()
3516                    .map(|d| match d {
3517                        1 => Some(values_iter.next().cloned().unwrap()),
3518                        0 => None,
3519                        _ => unreachable!(),
3520                    })
3521                    .collect()
3522            }
3523            None => values.iter().flatten().cloned().map(Some).collect(),
3524        };
3525        data
3526    }
3527
3528    fn generate_single_column_file_with_data<T: DataType>(
3529        values: &[Vec<T::T>],
3530        def_levels: Option<&Vec<Vec<i16>>>,
3531        file: File,
3532        schema: TypePtr,
3533        field: Option<Field>,
3534        opts: &TestOptions,
3535    ) -> Result<ParquetMetaData> {
3536        let mut writer_props = opts.writer_props();
3537        if let Some(field) = field {
3538            let arrow_schema = Schema::new(vec![field]);
3539            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3540        }
3541
3542        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3543
3544        for (idx, v) in values.iter().enumerate() {
3545            let def_levels = def_levels.map(|d| d[idx].as_slice());
3546            let mut row_group_writer = writer.next_row_group()?;
3547            {
3548                let mut column_writer = row_group_writer
3549                    .next_column()?
3550                    .expect("Column writer is none!");
3551
3552                column_writer
3553                    .typed::<T>()
3554                    .write_batch(v, def_levels, None)?;
3555
3556                column_writer.close()?;
3557            }
3558            row_group_writer.close()?;
3559        }
3560
3561        writer.close()
3562    }
3563
3564    fn get_test_file(file_name: &str) -> File {
3565        let mut path = PathBuf::new();
3566        path.push(arrow::util::test_util::arrow_test_data());
3567        path.push(file_name);
3568
3569        File::open(path.as_path()).expect("File not found!")
3570    }
3571
3572    #[test]
3573    fn test_read_structs() {
3574        // This particular test file has columns of struct types where there is
3575        // a column that has the same name as one of the struct fields
3576        // (see: ARROW-11452)
3577        let testdata = arrow::util::test_util::parquet_test_data();
3578        let path = format!("{testdata}/nested_structs.rust.parquet");
3579        let file = File::open(&path).unwrap();
3580        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3581
3582        for batch in record_batch_reader {
3583            batch.unwrap();
3584        }
3585
3586        let file = File::open(&path).unwrap();
3587        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3588
3589        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3590        let projected_reader = builder
3591            .with_projection(mask)
3592            .with_batch_size(60)
3593            .build()
3594            .unwrap();
3595
3596        let expected_schema = Schema::new(vec![
3597            Field::new(
3598                "roll_num",
3599                ArrowDataType::Struct(Fields::from(vec![Field::new(
3600                    "count",
3601                    ArrowDataType::UInt64,
3602                    false,
3603                )])),
3604                false,
3605            ),
3606            Field::new(
3607                "PC_CUR",
3608                ArrowDataType::Struct(Fields::from(vec![
3609                    Field::new("mean", ArrowDataType::Int64, false),
3610                    Field::new("sum", ArrowDataType::Int64, false),
3611                ])),
3612                false,
3613            ),
3614        ]);
3615
3616        // Tests for #1652 and #1654
3617        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3618
3619        for batch in projected_reader {
3620            let batch = batch.unwrap();
3621            assert_eq!(batch.schema().as_ref(), &expected_schema);
3622        }
3623    }
3624
3625    #[test]
3626    // same as test_read_structs but constructs projection mask via column names
3627    fn test_read_structs_by_name() {
3628        let testdata = arrow::util::test_util::parquet_test_data();
3629        let path = format!("{testdata}/nested_structs.rust.parquet");
3630        let file = File::open(&path).unwrap();
3631        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3632
3633        for batch in record_batch_reader {
3634            batch.unwrap();
3635        }
3636
3637        let file = File::open(&path).unwrap();
3638        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3639
3640        let mask = ProjectionMask::columns(
3641            builder.parquet_schema(),
3642            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3643        );
3644        let projected_reader = builder
3645            .with_projection(mask)
3646            .with_batch_size(60)
3647            .build()
3648            .unwrap();
3649
3650        let expected_schema = Schema::new(vec![
3651            Field::new(
3652                "roll_num",
3653                ArrowDataType::Struct(Fields::from(vec![Field::new(
3654                    "count",
3655                    ArrowDataType::UInt64,
3656                    false,
3657                )])),
3658                false,
3659            ),
3660            Field::new(
3661                "PC_CUR",
3662                ArrowDataType::Struct(Fields::from(vec![
3663                    Field::new("mean", ArrowDataType::Int64, false),
3664                    Field::new("sum", ArrowDataType::Int64, false),
3665                ])),
3666                false,
3667            ),
3668        ]);
3669
3670        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3671
3672        for batch in projected_reader {
3673            let batch = batch.unwrap();
3674            assert_eq!(batch.schema().as_ref(), &expected_schema);
3675        }
3676    }
3677
3678    #[test]
3679    fn test_read_maps() {
3680        let testdata = arrow::util::test_util::parquet_test_data();
3681        let path = format!("{testdata}/nested_maps.snappy.parquet");
3682        let file = File::open(path).unwrap();
3683        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3684
3685        for batch in record_batch_reader {
3686            batch.unwrap();
3687        }
3688    }
3689
3690    // test that we can handle the UNKNOWN logical type annotation on any physical type
3691    #[test]
3692    fn test_unknown_logical_type() {
3693        let message_type = "message uk {
3694            OPTIONAL INT32 uki32 (UNKNOWN);
3695            OPTIONAL INT64 uki64 (UNKNOWN);
3696            OPTIONAL INT96 uki96 (UNKNOWN);
3697            OPTIONAL BOOLEAN ukbool (UNKNOWN);
3698            OPTIONAL FLOAT ukfloat (UNKNOWN);
3699            OPTIONAL DOUBLE ukdbl (UNKNOWN);
3700            OPTIONAL BYTE_ARRAY ukbytes (UNKNOWN);
3701            OPTIONAL FIXED_LEN_BYTE_ARRAY(10) ukflba (UNKNOWN);
3702        }";
3703
3704        let schema = Arc::new(parse_message_type(message_type).unwrap());
3705        let file = tempfile::tempfile().unwrap();
3706
3707        let mut writer =
3708            SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3709                .unwrap();
3710
3711        let mut row_group_writer = writer.next_row_group().unwrap();
3712
3713        fn write_nulls<T: DataType>(row_group_writer: &mut SerializedRowGroupWriter<'_, File>) {
3714            let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3715            // write out a bunch of nulls
3716            column_writer
3717                .typed::<T>()
3718                .write_batch(&[], Some(&[0, 0, 0, 0]), None)
3719                .unwrap();
3720            column_writer.close().unwrap();
3721        }
3722
3723        // INT32
3724        write_nulls::<Int32Type>(&mut row_group_writer);
3725
3726        // INT64
3727        write_nulls::<Int64Type>(&mut row_group_writer);
3728
3729        // INT96
3730        write_nulls::<Int96Type>(&mut row_group_writer);
3731
3732        // BOOLEAN
3733        write_nulls::<BoolType>(&mut row_group_writer);
3734
3735        // FLOAT
3736        write_nulls::<FloatType>(&mut row_group_writer);
3737
3738        // DOUBLE
3739        write_nulls::<DoubleType>(&mut row_group_writer);
3740
3741        // BYTE_ARRAY
3742        write_nulls::<ByteArrayType>(&mut row_group_writer);
3743
3744        // FIXED_LEN_BYTE_ARRAY
3745        write_nulls::<FixedLenByteArrayType>(&mut row_group_writer);
3746
3747        row_group_writer.close().unwrap();
3748
3749        writer.close().unwrap();
3750
3751        let mut reader = ParquetRecordBatchReader::try_new(file, 4).unwrap();
3752        let batch = reader.next().unwrap().unwrap();
3753
3754        for col in batch.columns() {
3755            assert_eq!(col.len(), 4);
3756            assert_eq!(col.logical_null_count(), 4);
3757            assert_eq!(*col.data_type(), ArrowDataType::Null);
3758        }
3759    }
3760
3761    #[test]
3762    fn test_nested_nullability() {
3763        let message_type = "message nested {
3764          OPTIONAL Group group {
3765            REQUIRED INT32 leaf;
3766          }
3767        }";
3768
3769        let file = tempfile::tempfile().unwrap();
3770        let schema = Arc::new(parse_message_type(message_type).unwrap());
3771
3772        {
3773            // Write using low-level parquet API (#1167)
3774            let mut writer =
3775                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3776                    .unwrap();
3777
3778            {
3779                let mut row_group_writer = writer.next_row_group().unwrap();
3780                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3781
3782                column_writer
3783                    .typed::<Int32Type>()
3784                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3785                    .unwrap();
3786
3787                column_writer.close().unwrap();
3788                row_group_writer.close().unwrap();
3789            }
3790
3791            writer.close().unwrap();
3792        }
3793
3794        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3795        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3796
3797        let reader = builder.with_projection(mask).build().unwrap();
3798
3799        let expected_schema = Schema::new(vec![Field::new(
3800            "group",
3801            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3802            true,
3803        )]);
3804
3805        let batch = reader.into_iter().next().unwrap().unwrap();
3806        assert_eq!(batch.schema().as_ref(), &expected_schema);
3807        assert_eq!(batch.num_rows(), 4);
3808        assert_eq!(batch.column(0).null_count(), 2);
3809    }
3810
3811    #[test]
3812    fn test_dictionary_preservation() {
3813        let fields = vec![Arc::new(
3814            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3815                .with_repetition(Repetition::OPTIONAL)
3816                .with_converted_type(ConvertedType::UTF8)
3817                .build()
3818                .unwrap(),
3819        )];
3820
3821        let schema = Arc::new(
3822            Type::group_type_builder("test_schema")
3823                .with_fields(fields)
3824                .build()
3825                .unwrap(),
3826        );
3827
3828        let dict_type = ArrowDataType::Dictionary(
3829            Box::new(ArrowDataType::Int32),
3830            Box::new(ArrowDataType::Utf8),
3831        );
3832
3833        let arrow_field = Field::new("leaf", dict_type, true);
3834
3835        let mut file = tempfile::tempfile().unwrap();
3836
3837        let values = vec![
3838            vec![
3839                ByteArray::from("hello"),
3840                ByteArray::from("a"),
3841                ByteArray::from("b"),
3842                ByteArray::from("d"),
3843            ],
3844            vec![
3845                ByteArray::from("c"),
3846                ByteArray::from("a"),
3847                ByteArray::from("b"),
3848            ],
3849        ];
3850
3851        let def_levels = vec![
3852            vec![1, 0, 0, 1, 0, 0, 1, 1],
3853            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3854        ];
3855
3856        let opts = TestOptions {
3857            encoding: Encoding::RLE_DICTIONARY,
3858            ..Default::default()
3859        };
3860
3861        generate_single_column_file_with_data::<ByteArrayType>(
3862            &values,
3863            Some(&def_levels),
3864            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3865            schema,
3866            Some(arrow_field),
3867            &opts,
3868        )
3869        .unwrap();
3870
3871        file.rewind().unwrap();
3872
3873        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3874
3875        let batches = record_reader
3876            .collect::<Result<Vec<RecordBatch>, _>>()
3877            .unwrap();
3878
3879        assert_eq!(batches.len(), 6);
3880        assert!(batches.iter().all(|x| x.num_columns() == 1));
3881
3882        let row_counts = batches
3883            .iter()
3884            .map(|x| (x.num_rows(), x.column(0).null_count()))
3885            .collect::<Vec<_>>();
3886
3887        assert_eq!(
3888            row_counts,
3889            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3890        );
3891
3892        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3893
3894        // First and second batch in same row group -> same dictionary
3895        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3896        // Third batch spans row group -> computed dictionary
3897        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3898        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3899        // Fourth, fifth and sixth from same row group -> same dictionary
3900        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3901        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3902    }
3903
3904    #[test]
3905    fn test_read_null_list() {
3906        let testdata = arrow::util::test_util::parquet_test_data();
3907        let path = format!("{testdata}/null_list.parquet");
3908        let file = File::open(path).unwrap();
3909        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3910
3911        let batch = record_batch_reader.next().unwrap().unwrap();
3912        assert_eq!(batch.num_rows(), 1);
3913        assert_eq!(batch.num_columns(), 1);
3914        assert_eq!(batch.column(0).len(), 1);
3915
3916        let list = batch
3917            .column(0)
3918            .as_any()
3919            .downcast_ref::<ListArray>()
3920            .unwrap();
3921        assert_eq!(list.len(), 1);
3922        assert!(list.is_valid(0));
3923
3924        let val = list.value(0);
3925        assert_eq!(val.len(), 0);
3926    }
3927
3928    #[test]
3929    fn test_null_schema_inference() {
3930        let testdata = arrow::util::test_util::parquet_test_data();
3931        let path = format!("{testdata}/null_list.parquet");
3932        let file = File::open(path).unwrap();
3933
3934        let arrow_field = Field::new(
3935            "emptylist",
3936            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3937            true,
3938        );
3939
3940        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3941        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3942        let schema = builder.schema();
3943        assert_eq!(schema.fields().len(), 1);
3944        assert_eq!(schema.field(0), &arrow_field);
3945    }
3946
3947    #[test]
3948    fn test_skip_metadata() {
3949        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3950        let field = Field::new("col", col.data_type().clone(), true);
3951
3952        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3953
3954        let metadata = [("key".to_string(), "value".to_string())]
3955            .into_iter()
3956            .collect();
3957
3958        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3959
3960        assert_ne!(schema_with_metadata, schema_without_metadata);
3961
3962        let batch =
3963            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3964
3965        let file = |version: WriterVersion| {
3966            let props = WriterProperties::builder()
3967                .set_writer_version(version)
3968                .build();
3969
3970            let file = tempfile().unwrap();
3971            let mut writer =
3972                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3973                    .unwrap();
3974            writer.write(&batch).unwrap();
3975            writer.close().unwrap();
3976            file
3977        };
3978
3979        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3980
3981        let v1_reader = file(WriterVersion::PARQUET_1_0);
3982        let v2_reader = file(WriterVersion::PARQUET_2_0);
3983
3984        let arrow_reader =
3985            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3986        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3987
3988        let reader =
3989            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3990                .unwrap()
3991                .build()
3992                .unwrap();
3993        assert_eq!(reader.schema(), schema_without_metadata);
3994
3995        let arrow_reader =
3996            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3997        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3998
3999        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4000            .unwrap()
4001            .build()
4002            .unwrap();
4003        assert_eq!(reader.schema(), schema_without_metadata);
4004    }
4005
4006    fn write_parquet_from_iter<I, F>(value: I) -> File
4007    where
4008        I: IntoIterator<Item = (F, ArrayRef)>,
4009        F: AsRef<str>,
4010    {
4011        let batch = RecordBatch::try_from_iter(value).unwrap();
4012        let file = tempfile().unwrap();
4013        let mut writer =
4014            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4015        writer.write(&batch).unwrap();
4016        writer.close().unwrap();
4017        file
4018    }
4019
4020    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4021    where
4022        I: IntoIterator<Item = (F, ArrayRef)>,
4023        F: AsRef<str>,
4024    {
4025        let file = write_parquet_from_iter(value);
4026        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4027        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4028            file.try_clone().unwrap(),
4029            options_with_schema,
4030        );
4031        assert_eq!(builder.err().unwrap().to_string(), expected_error);
4032    }
4033
4034    #[test]
4035    fn test_schema_too_few_columns() {
4036        run_schema_test_with_error(
4037            vec![
4038                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4039                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4040            ],
4041            Arc::new(Schema::new(vec![Field::new(
4042                "int64",
4043                ArrowDataType::Int64,
4044                false,
4045            )])),
4046            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4047        );
4048    }
4049
4050    #[test]
4051    fn test_schema_too_many_columns() {
4052        run_schema_test_with_error(
4053            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4054            Arc::new(Schema::new(vec![
4055                Field::new("int64", ArrowDataType::Int64, false),
4056                Field::new("int32", ArrowDataType::Int32, false),
4057            ])),
4058            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4059        );
4060    }
4061
4062    #[test]
4063    fn test_schema_mismatched_column_names() {
4064        run_schema_test_with_error(
4065            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4066            Arc::new(Schema::new(vec![Field::new(
4067                "other",
4068                ArrowDataType::Int64,
4069                false,
4070            )])),
4071            "Arrow: incompatible arrow schema, expected field named int64 got other",
4072        );
4073    }
4074
4075    #[test]
4076    fn test_schema_incompatible_columns() {
4077        run_schema_test_with_error(
4078            vec![
4079                (
4080                    "col1_invalid",
4081                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4082                ),
4083                (
4084                    "col2_valid",
4085                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4086                ),
4087                (
4088                    "col3_invalid",
4089                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4090                ),
4091            ],
4092            Arc::new(Schema::new(vec![
4093                Field::new("col1_invalid", ArrowDataType::Int32, false),
4094                Field::new("col2_valid", ArrowDataType::Int32, false),
4095                Field::new("col3_invalid", ArrowDataType::Int32, false),
4096            ])),
4097            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4098        );
4099    }
4100
4101    #[test]
4102    fn test_one_incompatible_nested_column() {
4103        let nested_fields = Fields::from(vec![
4104            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4105            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4106        ]);
4107        let nested = StructArray::try_new(
4108            nested_fields,
4109            vec![
4110                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4111                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4112            ],
4113            None,
4114        )
4115        .expect("struct array");
4116        let supplied_nested_fields = Fields::from(vec![
4117            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4118            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4119        ]);
4120        run_schema_test_with_error(
4121            vec![
4122                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4123                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4124                ("nested", Arc::new(nested) as ArrayRef),
4125            ],
4126            Arc::new(Schema::new(vec![
4127                Field::new("col1", ArrowDataType::Int64, false),
4128                Field::new("col2", ArrowDataType::Int32, false),
4129                Field::new(
4130                    "nested",
4131                    ArrowDataType::Struct(supplied_nested_fields),
4132                    false,
4133                ),
4134            ])),
4135            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4136            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4137            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4138        );
4139    }
4140
4141    /// Return parquet data with a single column of utf8 strings
4142    fn utf8_parquet() -> Bytes {
4143        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4144        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4145        let props = None;
4146        // write parquet file with non nullable strings
4147        let mut parquet_data = vec![];
4148        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4149        writer.write(&batch).unwrap();
4150        writer.close().unwrap();
4151        Bytes::from(parquet_data)
4152    }
4153
4154    #[test]
4155    fn test_schema_error_bad_types() {
4156        // verify incompatible schemas error on read
4157        let parquet_data = utf8_parquet();
4158
4159        // Ask to read it back with an incompatible schema (int vs string)
4160        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4161            "column1",
4162            arrow::datatypes::DataType::Int32,
4163            false,
4164        )]));
4165
4166        // read it back out
4167        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4168        let err =
4169            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4170                .unwrap_err();
4171        assert_eq!(
4172            err.to_string(),
4173            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4174        )
4175    }
4176
4177    #[test]
4178    fn test_schema_error_bad_nullability() {
4179        // verify incompatible schemas error on read
4180        let parquet_data = utf8_parquet();
4181
4182        // Ask to read it back with an incompatible schema (nullability mismatch)
4183        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4184            "column1",
4185            arrow::datatypes::DataType::Utf8,
4186            true,
4187        )]));
4188
4189        // read it back out
4190        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4191        let err =
4192            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4193                .unwrap_err();
4194        assert_eq!(
4195            err.to_string(),
4196            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4197        )
4198    }
4199
4200    #[test]
4201    fn test_read_binary_as_utf8() {
4202        let file = write_parquet_from_iter(vec![
4203            (
4204                "binary_to_utf8",
4205                Arc::new(BinaryArray::from(vec![
4206                    b"one".as_ref(),
4207                    b"two".as_ref(),
4208                    b"three".as_ref(),
4209                ])) as ArrayRef,
4210            ),
4211            (
4212                "large_binary_to_large_utf8",
4213                Arc::new(LargeBinaryArray::from(vec![
4214                    b"one".as_ref(),
4215                    b"two".as_ref(),
4216                    b"three".as_ref(),
4217                ])) as ArrayRef,
4218            ),
4219            (
4220                "binary_view_to_utf8_view",
4221                Arc::new(BinaryViewArray::from(vec![
4222                    b"one".as_ref(),
4223                    b"two".as_ref(),
4224                    b"three".as_ref(),
4225                ])) as ArrayRef,
4226            ),
4227        ]);
4228        let supplied_fields = Fields::from(vec![
4229            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4230            Field::new(
4231                "large_binary_to_large_utf8",
4232                ArrowDataType::LargeUtf8,
4233                false,
4234            ),
4235            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4236        ]);
4237
4238        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4239        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4240            file.try_clone().unwrap(),
4241            options,
4242        )
4243        .expect("reader builder with schema")
4244        .build()
4245        .expect("reader with schema");
4246
4247        let batch = arrow_reader.next().unwrap().unwrap();
4248        assert_eq!(batch.num_columns(), 3);
4249        assert_eq!(batch.num_rows(), 3);
4250        assert_eq!(
4251            batch
4252                .column(0)
4253                .as_string::<i32>()
4254                .iter()
4255                .collect::<Vec<_>>(),
4256            vec![Some("one"), Some("two"), Some("three")]
4257        );
4258
4259        assert_eq!(
4260            batch
4261                .column(1)
4262                .as_string::<i64>()
4263                .iter()
4264                .collect::<Vec<_>>(),
4265            vec![Some("one"), Some("two"), Some("three")]
4266        );
4267
4268        assert_eq!(
4269            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4270            vec![Some("one"), Some("two"), Some("three")]
4271        );
4272    }
4273
4274    #[test]
4275    #[should_panic(expected = "Invalid UTF8 sequence at")]
4276    fn test_read_non_utf8_binary_as_utf8() {
4277        let file = write_parquet_from_iter(vec![(
4278            "non_utf8_binary",
4279            Arc::new(BinaryArray::from(vec![
4280                b"\xDE\x00\xFF".as_ref(),
4281                b"\xDE\x01\xAA".as_ref(),
4282                b"\xDE\x02\xFF".as_ref(),
4283            ])) as ArrayRef,
4284        )]);
4285        let supplied_fields = Fields::from(vec![Field::new(
4286            "non_utf8_binary",
4287            ArrowDataType::Utf8,
4288            false,
4289        )]);
4290
4291        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4292        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4293            file.try_clone().unwrap(),
4294            options,
4295        )
4296        .expect("reader builder with schema")
4297        .build()
4298        .expect("reader with schema");
4299        arrow_reader.next().unwrap().unwrap_err();
4300    }
4301
4302    #[test]
4303    fn test_with_schema() {
4304        let nested_fields = Fields::from(vec![
4305            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4306            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4307        ]);
4308
4309        let nested_arrays: Vec<ArrayRef> = vec![
4310            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4311            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4312        ];
4313
4314        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4315
4316        let file = write_parquet_from_iter(vec![
4317            (
4318                "int32_to_ts_second",
4319                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4320            ),
4321            (
4322                "date32_to_date64",
4323                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4324            ),
4325            ("nested", Arc::new(nested) as ArrayRef),
4326        ]);
4327
4328        let supplied_nested_fields = Fields::from(vec![
4329            Field::new(
4330                "utf8_to_dict",
4331                ArrowDataType::Dictionary(
4332                    Box::new(ArrowDataType::Int32),
4333                    Box::new(ArrowDataType::Utf8),
4334                ),
4335                false,
4336            ),
4337            Field::new(
4338                "int64_to_ts_nano",
4339                ArrowDataType::Timestamp(
4340                    arrow::datatypes::TimeUnit::Nanosecond,
4341                    Some("+10:00".into()),
4342                ),
4343                false,
4344            ),
4345        ]);
4346
4347        let supplied_schema = Arc::new(Schema::new(vec![
4348            Field::new(
4349                "int32_to_ts_second",
4350                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4351                false,
4352            ),
4353            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4354            Field::new(
4355                "nested",
4356                ArrowDataType::Struct(supplied_nested_fields),
4357                false,
4358            ),
4359        ]));
4360
4361        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4362        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4363            file.try_clone().unwrap(),
4364            options,
4365        )
4366        .expect("reader builder with schema")
4367        .build()
4368        .expect("reader with schema");
4369
4370        assert_eq!(arrow_reader.schema(), supplied_schema);
4371        let batch = arrow_reader.next().unwrap().unwrap();
4372        assert_eq!(batch.num_columns(), 3);
4373        assert_eq!(batch.num_rows(), 4);
4374        assert_eq!(
4375            batch
4376                .column(0)
4377                .as_any()
4378                .downcast_ref::<TimestampSecondArray>()
4379                .expect("downcast to timestamp second")
4380                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4381                .map(|v| v.to_string())
4382                .expect("value as datetime"),
4383            "1970-01-01 01:00:00 +01:00"
4384        );
4385        assert_eq!(
4386            batch
4387                .column(1)
4388                .as_any()
4389                .downcast_ref::<Date64Array>()
4390                .expect("downcast to date64")
4391                .value_as_date(0)
4392                .map(|v| v.to_string())
4393                .expect("value as date"),
4394            "1970-01-01"
4395        );
4396
4397        let nested = batch
4398            .column(2)
4399            .as_any()
4400            .downcast_ref::<StructArray>()
4401            .expect("downcast to struct");
4402
4403        let nested_dict = nested
4404            .column(0)
4405            .as_any()
4406            .downcast_ref::<Int32DictionaryArray>()
4407            .expect("downcast to dictionary");
4408
4409        assert_eq!(
4410            nested_dict
4411                .values()
4412                .as_any()
4413                .downcast_ref::<StringArray>()
4414                .expect("downcast to string")
4415                .iter()
4416                .collect::<Vec<_>>(),
4417            vec![Some("a"), Some("b")]
4418        );
4419
4420        assert_eq!(
4421            nested_dict.keys().iter().collect::<Vec<_>>(),
4422            vec![Some(0), Some(0), Some(0), Some(1)]
4423        );
4424
4425        assert_eq!(
4426            nested
4427                .column(1)
4428                .as_any()
4429                .downcast_ref::<TimestampNanosecondArray>()
4430                .expect("downcast to timestamp nanosecond")
4431                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4432                .map(|v| v.to_string())
4433                .expect("value as datetime"),
4434            "1970-01-01 10:00:00.000000001 +10:00"
4435        );
4436    }
4437
4438    #[test]
4439    fn test_empty_projection() {
4440        let testdata = arrow::util::test_util::parquet_test_data();
4441        let path = format!("{testdata}/alltypes_plain.parquet");
4442        let file = File::open(path).unwrap();
4443
4444        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4445        let file_metadata = builder.metadata().file_metadata();
4446        let expected_rows = file_metadata.num_rows() as usize;
4447
4448        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4449        let batch_reader = builder
4450            .with_projection(mask)
4451            .with_batch_size(2)
4452            .build()
4453            .unwrap();
4454
4455        let mut total_rows = 0;
4456        for maybe_batch in batch_reader {
4457            let batch = maybe_batch.unwrap();
4458            total_rows += batch.num_rows();
4459            assert_eq!(batch.num_columns(), 0);
4460            assert!(batch.num_rows() <= 2);
4461        }
4462
4463        assert_eq!(total_rows, expected_rows);
4464    }
4465
4466    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4467        let schema = Arc::new(Schema::new(vec![Field::new(
4468            "list",
4469            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4470            true,
4471        )]));
4472
4473        let mut buf = Vec::with_capacity(1024);
4474
4475        let mut writer = ArrowWriter::try_new(
4476            &mut buf,
4477            schema.clone(),
4478            Some(
4479                WriterProperties::builder()
4480                    .set_max_row_group_row_count(Some(row_group_size))
4481                    .build(),
4482            ),
4483        )
4484        .unwrap();
4485        for _ in 0..2 {
4486            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4487            for _ in 0..(batch_size) {
4488                list_builder.append(true);
4489            }
4490            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4491                .unwrap();
4492            writer.write(&batch).unwrap();
4493        }
4494        writer.close().unwrap();
4495
4496        let mut record_reader =
4497            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4498        assert_eq!(
4499            batch_size,
4500            record_reader.next().unwrap().unwrap().num_rows()
4501        );
4502        assert_eq!(
4503            batch_size,
4504            record_reader.next().unwrap().unwrap().num_rows()
4505        );
4506    }
4507
4508    #[test]
4509    fn test_row_group_exact_multiple() {
4510        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4511        test_row_group_batch(8, 8);
4512        test_row_group_batch(10, 8);
4513        test_row_group_batch(8, 10);
4514        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4515        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4516        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4517        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4518        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4519    }
4520
4521    /// Given a RecordBatch containing all the column data, return the expected batches given
4522    /// a `batch_size` and `selection`
4523    fn get_expected_batches(
4524        column: &RecordBatch,
4525        selection: &RowSelection,
4526        batch_size: usize,
4527    ) -> Vec<RecordBatch> {
4528        let mut expected_batches = vec![];
4529
4530        let mut selection: VecDeque<_> = selection.clone().into();
4531        let mut row_offset = 0;
4532        let mut last_start = None;
4533        while row_offset < column.num_rows() && !selection.is_empty() {
4534            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4535            while batch_remaining > 0 && !selection.is_empty() {
4536                let (to_read, skip) = match selection.front_mut() {
4537                    Some(selection) if selection.row_count > batch_remaining => {
4538                        selection.row_count -= batch_remaining;
4539                        (batch_remaining, selection.skip)
4540                    }
4541                    Some(_) => {
4542                        let select = selection.pop_front().unwrap();
4543                        (select.row_count, select.skip)
4544                    }
4545                    None => break,
4546                };
4547
4548                batch_remaining -= to_read;
4549
4550                match skip {
4551                    true => {
4552                        if let Some(last_start) = last_start.take() {
4553                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4554                        }
4555                        row_offset += to_read
4556                    }
4557                    false => {
4558                        last_start.get_or_insert(row_offset);
4559                        row_offset += to_read
4560                    }
4561                }
4562            }
4563        }
4564
4565        if let Some(last_start) = last_start.take() {
4566            expected_batches.push(column.slice(last_start, row_offset - last_start))
4567        }
4568
4569        // Sanity check, all batches except the final should be the batch size
4570        for batch in &expected_batches[..expected_batches.len() - 1] {
4571            assert_eq!(batch.num_rows(), batch_size);
4572        }
4573
4574        expected_batches
4575    }
4576
4577    fn create_test_selection(
4578        step_len: usize,
4579        total_len: usize,
4580        skip_first: bool,
4581    ) -> (RowSelection, usize) {
4582        let mut remaining = total_len;
4583        let mut skip = skip_first;
4584        let mut vec = vec![];
4585        let mut selected_count = 0;
4586        while remaining != 0 {
4587            let step = if remaining > step_len {
4588                step_len
4589            } else {
4590                remaining
4591            };
4592            vec.push(RowSelector {
4593                row_count: step,
4594                skip,
4595            });
4596            remaining -= step;
4597            if !skip {
4598                selected_count += step;
4599            }
4600            skip = !skip;
4601        }
4602        (vec.into(), selected_count)
4603    }
4604
4605    #[test]
4606    fn test_scan_row_with_selection() {
4607        let testdata = arrow::util::test_util::parquet_test_data();
4608        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4609        let test_file = File::open(&path).unwrap();
4610
4611        let mut serial_reader =
4612            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4613        let data = serial_reader.next().unwrap().unwrap();
4614
4615        let do_test = |batch_size: usize, selection_len: usize| {
4616            for skip_first in [false, true] {
4617                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4618
4619                let expected = get_expected_batches(&data, &selections, batch_size);
4620                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4621                assert_eq!(
4622                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4623                    expected,
4624                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4625                );
4626            }
4627        };
4628
4629        // total row count 7300
4630        // 1. test selection len more than one page row count
4631        do_test(1000, 1000);
4632
4633        // 2. test selection len less than one page row count
4634        do_test(20, 20);
4635
4636        // 3. test selection_len less than batch_size
4637        do_test(20, 5);
4638
4639        // 4. test selection_len more than batch_size
4640        // If batch_size < selection_len
4641        do_test(20, 5);
4642
4643        fn create_skip_reader(
4644            test_file: &File,
4645            batch_size: usize,
4646            selections: RowSelection,
4647        ) -> ParquetRecordBatchReader {
4648            let options =
4649                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4650            let file = test_file.try_clone().unwrap();
4651            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4652                .unwrap()
4653                .with_batch_size(batch_size)
4654                .with_row_selection(selections)
4655                .build()
4656                .unwrap()
4657        }
4658    }
4659
4660    #[test]
4661    fn test_batch_size_overallocate() {
4662        let testdata = arrow::util::test_util::parquet_test_data();
4663        // `alltypes_plain.parquet` only have 8 rows
4664        let path = format!("{testdata}/alltypes_plain.parquet");
4665        let test_file = File::open(path).unwrap();
4666
4667        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4668        let num_rows = builder.metadata.file_metadata().num_rows();
4669        let reader = builder
4670            .with_batch_size(1024)
4671            .with_projection(ProjectionMask::all())
4672            .build()
4673            .unwrap();
4674        assert_ne!(1024, num_rows);
4675        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4676    }
4677
4678    #[test]
4679    fn test_read_with_page_index_enabled() {
4680        let testdata = arrow::util::test_util::parquet_test_data();
4681
4682        {
4683            // `alltypes_tiny_pages.parquet` has page index
4684            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4685            let test_file = File::open(path).unwrap();
4686            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4687                test_file,
4688                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4689            )
4690            .unwrap();
4691            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4692            let reader = builder.build().unwrap();
4693            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4694            assert_eq!(batches.len(), 8);
4695        }
4696
4697        {
4698            // `alltypes_plain.parquet` doesn't have page index
4699            let path = format!("{testdata}/alltypes_plain.parquet");
4700            let test_file = File::open(path).unwrap();
4701            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4702                test_file,
4703                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4704            )
4705            .unwrap();
4706            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4707            // we should read the file successfully.
4708            assert!(builder.metadata().offset_index().is_none());
4709            let reader = builder.build().unwrap();
4710            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4711            assert_eq!(batches.len(), 1);
4712        }
4713    }
4714
4715    #[test]
4716    fn test_raw_repetition() {
4717        const MESSAGE_TYPE: &str = "
4718            message Log {
4719              OPTIONAL INT32 eventType;
4720              REPEATED INT32 category;
4721              REPEATED group filter {
4722                OPTIONAL INT32 error;
4723              }
4724            }
4725        ";
4726        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4727        let props = Default::default();
4728
4729        let mut buf = Vec::with_capacity(1024);
4730        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4731        let mut row_group_writer = writer.next_row_group().unwrap();
4732
4733        // column 0
4734        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4735        col_writer
4736            .typed::<Int32Type>()
4737            .write_batch(&[1], Some(&[1]), None)
4738            .unwrap();
4739        col_writer.close().unwrap();
4740        // column 1
4741        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4742        col_writer
4743            .typed::<Int32Type>()
4744            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4745            .unwrap();
4746        col_writer.close().unwrap();
4747        // column 2
4748        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4749        col_writer
4750            .typed::<Int32Type>()
4751            .write_batch(&[1], Some(&[1]), Some(&[0]))
4752            .unwrap();
4753        col_writer.close().unwrap();
4754
4755        let rg_md = row_group_writer.close().unwrap();
4756        assert_eq!(rg_md.num_rows(), 1);
4757        writer.close().unwrap();
4758
4759        let bytes = Bytes::from(buf);
4760
4761        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4762        let full = no_mask.next().unwrap().unwrap();
4763
4764        assert_eq!(full.num_columns(), 3);
4765
4766        for idx in 0..3 {
4767            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4768            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4769            let mut reader = b.with_projection(mask).build().unwrap();
4770            let projected = reader.next().unwrap().unwrap();
4771
4772            assert_eq!(projected.num_columns(), 1);
4773            assert_eq!(full.column(idx), projected.column(0));
4774        }
4775    }
4776
4777    #[test]
4778    fn test_read_lz4_raw() {
4779        let testdata = arrow::util::test_util::parquet_test_data();
4780        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4781        let file = File::open(path).unwrap();
4782
4783        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4784            .unwrap()
4785            .collect::<Result<Vec<_>, _>>()
4786            .unwrap();
4787        assert_eq!(batches.len(), 1);
4788        let batch = &batches[0];
4789
4790        assert_eq!(batch.num_columns(), 3);
4791        assert_eq!(batch.num_rows(), 4);
4792
4793        // https://github.com/apache/parquet-testing/pull/18
4794        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4795        assert_eq!(
4796            a.values(),
4797            &[1593604800, 1593604800, 1593604801, 1593604801]
4798        );
4799
4800        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4801        let a: Vec<_> = a.iter().flatten().collect();
4802        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4803
4804        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4805        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4806    }
4807
4808    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4809    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4810    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4811    //    LZ4_HADOOP algorithm for compression.
4812    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4813    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4814    //    older parquet-cpp versions.
4815    //
4816    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4817    #[test]
4818    fn test_read_lz4_hadoop_fallback() {
4819        for file in [
4820            "hadoop_lz4_compressed.parquet",
4821            "non_hadoop_lz4_compressed.parquet",
4822        ] {
4823            let testdata = arrow::util::test_util::parquet_test_data();
4824            let path = format!("{testdata}/{file}");
4825            let file = File::open(path).unwrap();
4826            let expected_rows = 4;
4827
4828            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4829                .unwrap()
4830                .collect::<Result<Vec<_>, _>>()
4831                .unwrap();
4832            assert_eq!(batches.len(), 1);
4833            let batch = &batches[0];
4834
4835            assert_eq!(batch.num_columns(), 3);
4836            assert_eq!(batch.num_rows(), expected_rows);
4837
4838            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4839            assert_eq!(
4840                a.values(),
4841                &[1593604800, 1593604800, 1593604801, 1593604801]
4842            );
4843
4844            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4845            let b: Vec<_> = b.iter().flatten().collect();
4846            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4847
4848            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4849            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4850        }
4851    }
4852
4853    #[test]
4854    fn test_read_lz4_hadoop_large() {
4855        let testdata = arrow::util::test_util::parquet_test_data();
4856        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4857        let file = File::open(path).unwrap();
4858        let expected_rows = 10000;
4859
4860        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4861            .unwrap()
4862            .collect::<Result<Vec<_>, _>>()
4863            .unwrap();
4864        assert_eq!(batches.len(), 1);
4865        let batch = &batches[0];
4866
4867        assert_eq!(batch.num_columns(), 1);
4868        assert_eq!(batch.num_rows(), expected_rows);
4869
4870        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4871        let a: Vec<_> = a.iter().flatten().collect();
4872        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4873        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4874        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4875        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4876    }
4877
4878    #[test]
4879    #[cfg(feature = "snap")]
4880    fn test_read_nested_lists() {
4881        let testdata = arrow::util::test_util::parquet_test_data();
4882        let path = format!("{testdata}/nested_lists.snappy.parquet");
4883        let file = File::open(path).unwrap();
4884
4885        let f = file.try_clone().unwrap();
4886        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4887        let expected = reader.next().unwrap().unwrap();
4888        assert_eq!(expected.num_rows(), 3);
4889
4890        let selection = RowSelection::from(vec![
4891            RowSelector::skip(1),
4892            RowSelector::select(1),
4893            RowSelector::skip(1),
4894        ]);
4895        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4896            .unwrap()
4897            .with_row_selection(selection)
4898            .build()
4899            .unwrap();
4900
4901        let actual = reader.next().unwrap().unwrap();
4902        assert_eq!(actual.num_rows(), 1);
4903        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4904    }
4905
4906    #[test]
4907    fn test_arbitrary_decimal() {
4908        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4909        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4910            .with_precision_and_scale(19, 0)
4911            .unwrap();
4912        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4913            .with_precision_and_scale(12, 0)
4914            .unwrap();
4915        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4916            .with_precision_and_scale(17, 10)
4917            .unwrap();
4918
4919        let written = RecordBatch::try_from_iter([
4920            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4921            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4922            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4923        ])
4924        .unwrap();
4925
4926        let mut buffer = Vec::with_capacity(1024);
4927        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4928        writer.write(&written).unwrap();
4929        writer.close().unwrap();
4930
4931        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4932            .unwrap()
4933            .collect::<Result<Vec<_>, _>>()
4934            .unwrap();
4935
4936        assert_eq!(&written.slice(0, 8), &read[0]);
4937    }
4938
4939    #[test]
4940    fn test_list_skip() {
4941        let mut list = ListBuilder::new(Int32Builder::new());
4942        list.append_value([Some(1), Some(2)]);
4943        list.append_value([Some(3)]);
4944        list.append_value([Some(4)]);
4945        let list = list.finish();
4946        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4947
4948        // First page contains 2 values but only 1 row
4949        let props = WriterProperties::builder()
4950            .set_data_page_row_count_limit(1)
4951            .set_write_batch_size(2)
4952            .build();
4953
4954        let mut buffer = Vec::with_capacity(1024);
4955        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4956        writer.write(&batch).unwrap();
4957        writer.close().unwrap();
4958
4959        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4960        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4961            .unwrap()
4962            .with_row_selection(selection.into())
4963            .build()
4964            .unwrap();
4965        let out = reader.next().unwrap().unwrap();
4966        assert_eq!(out.num_rows(), 1);
4967        assert_eq!(out, batch.slice(2, 1));
4968    }
4969
4970    fn test_decimal32_roundtrip() {
4971        let d = |values: Vec<i32>, p: u8| {
4972            let iter = values.into_iter();
4973            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4974                .with_precision_and_scale(p, 2)
4975                .unwrap()
4976        };
4977
4978        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4979        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4980
4981        let mut buffer = Vec::with_capacity(1024);
4982        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4983        writer.write(&batch).unwrap();
4984        writer.close().unwrap();
4985
4986        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4987        let t1 = builder.parquet_schema().columns()[0].physical_type();
4988        assert_eq!(t1, PhysicalType::INT32);
4989
4990        let mut reader = builder.build().unwrap();
4991        assert_eq!(batch.schema(), reader.schema());
4992
4993        let out = reader.next().unwrap().unwrap();
4994        assert_eq!(batch, out);
4995    }
4996
4997    fn test_decimal64_roundtrip() {
4998        // Precision <= 9 -> INT32
4999        // Precision <= 18 -> INT64
5000
5001        let d = |values: Vec<i64>, p: u8| {
5002            let iter = values.into_iter();
5003            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5004                .with_precision_and_scale(p, 2)
5005                .unwrap()
5006        };
5007
5008        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5009        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5010        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5011
5012        let batch = RecordBatch::try_from_iter([
5013            ("d1", Arc::new(d1) as ArrayRef),
5014            ("d2", Arc::new(d2) as ArrayRef),
5015            ("d3", Arc::new(d3) as ArrayRef),
5016        ])
5017        .unwrap();
5018
5019        let mut buffer = Vec::with_capacity(1024);
5020        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5021        writer.write(&batch).unwrap();
5022        writer.close().unwrap();
5023
5024        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5025        let t1 = builder.parquet_schema().columns()[0].physical_type();
5026        assert_eq!(t1, PhysicalType::INT32);
5027        let t2 = builder.parquet_schema().columns()[1].physical_type();
5028        assert_eq!(t2, PhysicalType::INT64);
5029        let t3 = builder.parquet_schema().columns()[2].physical_type();
5030        assert_eq!(t3, PhysicalType::INT64);
5031
5032        let mut reader = builder.build().unwrap();
5033        assert_eq!(batch.schema(), reader.schema());
5034
5035        let out = reader.next().unwrap().unwrap();
5036        assert_eq!(batch, out);
5037    }
5038
5039    fn test_decimal_roundtrip<T: DecimalType>() {
5040        // Precision <= 9 -> INT32
5041        // Precision <= 18 -> INT64
5042        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5043
5044        let d = |values: Vec<usize>, p: u8| {
5045            let iter = values.into_iter().map(T::Native::usize_as);
5046            PrimitiveArray::<T>::from_iter_values(iter)
5047                .with_precision_and_scale(p, 2)
5048                .unwrap()
5049        };
5050
5051        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5052        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5053        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5054        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5055
5056        let batch = RecordBatch::try_from_iter([
5057            ("d1", Arc::new(d1) as ArrayRef),
5058            ("d2", Arc::new(d2) as ArrayRef),
5059            ("d3", Arc::new(d3) as ArrayRef),
5060            ("d4", Arc::new(d4) as ArrayRef),
5061        ])
5062        .unwrap();
5063
5064        let mut buffer = Vec::with_capacity(1024);
5065        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5066        writer.write(&batch).unwrap();
5067        writer.close().unwrap();
5068
5069        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5070        let t1 = builder.parquet_schema().columns()[0].physical_type();
5071        assert_eq!(t1, PhysicalType::INT32);
5072        let t2 = builder.parquet_schema().columns()[1].physical_type();
5073        assert_eq!(t2, PhysicalType::INT64);
5074        let t3 = builder.parquet_schema().columns()[2].physical_type();
5075        assert_eq!(t3, PhysicalType::INT64);
5076        let t4 = builder.parquet_schema().columns()[3].physical_type();
5077        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5078
5079        let mut reader = builder.build().unwrap();
5080        assert_eq!(batch.schema(), reader.schema());
5081
5082        let out = reader.next().unwrap().unwrap();
5083        assert_eq!(batch, out);
5084    }
5085
5086    #[test]
5087    fn test_decimal() {
5088        test_decimal32_roundtrip();
5089        test_decimal64_roundtrip();
5090        test_decimal_roundtrip::<Decimal128Type>();
5091        test_decimal_roundtrip::<Decimal256Type>();
5092    }
5093
5094    #[test]
5095    fn test_list_selection() {
5096        let schema = Arc::new(Schema::new(vec![Field::new_list(
5097            "list",
5098            Field::new_list_field(ArrowDataType::Utf8, true),
5099            false,
5100        )]));
5101        let mut buf = Vec::with_capacity(1024);
5102
5103        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5104
5105        for i in 0..2 {
5106            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5107            for j in 0..1024 {
5108                list_a_builder.values().append_value(format!("{i} {j}"));
5109                list_a_builder.append(true);
5110            }
5111            let batch =
5112                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5113                    .unwrap();
5114            writer.write(&batch).unwrap();
5115        }
5116        let _metadata = writer.close().unwrap();
5117
5118        let buf = Bytes::from(buf);
5119        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5120            .unwrap()
5121            .with_row_selection(RowSelection::from(vec![
5122                RowSelector::skip(100),
5123                RowSelector::select(924),
5124                RowSelector::skip(100),
5125                RowSelector::select(924),
5126            ]))
5127            .build()
5128            .unwrap();
5129
5130        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5131        let batch = concat_batches(&schema, &batches).unwrap();
5132
5133        assert_eq!(batch.num_rows(), 924 * 2);
5134        let list = batch.column(0).as_list::<i32>();
5135
5136        for w in list.value_offsets().windows(2) {
5137            assert_eq!(w[0] + 1, w[1])
5138        }
5139        let mut values = list.values().as_string::<i32>().iter();
5140
5141        for i in 0..2 {
5142            for j in 100..1024 {
5143                let expected = format!("{i} {j}");
5144                assert_eq!(values.next().unwrap().unwrap(), &expected);
5145            }
5146        }
5147    }
5148
5149    #[test]
5150    fn test_list_selection_fuzz() {
5151        let mut rng = rng();
5152        let schema = Arc::new(Schema::new(vec![Field::new_list(
5153            "list",
5154            Field::new_list(
5155                Field::LIST_FIELD_DEFAULT_NAME,
5156                Field::new_list_field(ArrowDataType::Int32, true),
5157                true,
5158            ),
5159            true,
5160        )]));
5161        let mut buf = Vec::with_capacity(1024);
5162        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5163
5164        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5165
5166        for _ in 0..2048 {
5167            if rng.random_bool(0.2) {
5168                list_a_builder.append(false);
5169                continue;
5170            }
5171
5172            let list_a_len = rng.random_range(0..10);
5173            let list_b_builder = list_a_builder.values();
5174
5175            for _ in 0..list_a_len {
5176                if rng.random_bool(0.2) {
5177                    list_b_builder.append(false);
5178                    continue;
5179                }
5180
5181                let list_b_len = rng.random_range(0..10);
5182                let int_builder = list_b_builder.values();
5183                for _ in 0..list_b_len {
5184                    match rng.random_bool(0.2) {
5185                        true => int_builder.append_null(),
5186                        false => int_builder.append_value(rng.random()),
5187                    }
5188                }
5189                list_b_builder.append(true)
5190            }
5191            list_a_builder.append(true);
5192        }
5193
5194        let array = Arc::new(list_a_builder.finish());
5195        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5196
5197        writer.write(&batch).unwrap();
5198        let _metadata = writer.close().unwrap();
5199
5200        let buf = Bytes::from(buf);
5201
5202        let cases = [
5203            vec![
5204                RowSelector::skip(100),
5205                RowSelector::select(924),
5206                RowSelector::skip(100),
5207                RowSelector::select(924),
5208            ],
5209            vec![
5210                RowSelector::select(924),
5211                RowSelector::skip(100),
5212                RowSelector::select(924),
5213                RowSelector::skip(100),
5214            ],
5215            vec![
5216                RowSelector::skip(1023),
5217                RowSelector::select(1),
5218                RowSelector::skip(1023),
5219                RowSelector::select(1),
5220            ],
5221            vec![
5222                RowSelector::select(1),
5223                RowSelector::skip(1023),
5224                RowSelector::select(1),
5225                RowSelector::skip(1023),
5226            ],
5227        ];
5228
5229        for batch_size in [100, 1024, 2048] {
5230            for selection in &cases {
5231                let selection = RowSelection::from(selection.clone());
5232                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5233                    .unwrap()
5234                    .with_row_selection(selection.clone())
5235                    .with_batch_size(batch_size)
5236                    .build()
5237                    .unwrap();
5238
5239                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5240                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5241                assert_eq!(actual.num_rows(), selection.row_count());
5242
5243                let mut batch_offset = 0;
5244                let mut actual_offset = 0;
5245                for selector in selection.iter() {
5246                    if selector.skip {
5247                        batch_offset += selector.row_count;
5248                        continue;
5249                    }
5250
5251                    assert_eq!(
5252                        batch.slice(batch_offset, selector.row_count),
5253                        actual.slice(actual_offset, selector.row_count)
5254                    );
5255
5256                    batch_offset += selector.row_count;
5257                    actual_offset += selector.row_count;
5258                }
5259            }
5260        }
5261    }
5262
5263    #[test]
5264    fn test_read_old_nested_list() {
5265        use arrow::datatypes::DataType;
5266        use arrow::datatypes::ToByteSlice;
5267
5268        let testdata = arrow::util::test_util::parquet_test_data();
5269        // message my_record {
5270        //     REQUIRED group a (LIST) {
5271        //         REPEATED group array (LIST) {
5272        //             REPEATED INT32 array;
5273        //         }
5274        //     }
5275        // }
5276        // should be read as list<list<int32>>
5277        let path = format!("{testdata}/old_list_structure.parquet");
5278        let test_file = File::open(path).unwrap();
5279
5280        // create expected ListArray
5281        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5282
5283        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5284        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5285
5286        // Construct a list array from the above two
5287        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5288            "array",
5289            DataType::Int32,
5290            false,
5291        ))))
5292        .len(2)
5293        .add_buffer(a_value_offsets)
5294        .add_child_data(a_values.into_data())
5295        .build()
5296        .unwrap();
5297        let a = ListArray::from(a_list_data);
5298
5299        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5300        let mut reader = builder.build().unwrap();
5301        let out = reader.next().unwrap().unwrap();
5302        assert_eq!(out.num_rows(), 1);
5303        assert_eq!(out.num_columns(), 1);
5304        // grab first column
5305        let c0 = out.column(0);
5306        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5307        // get first row: [[1, 2], [3, 4]]
5308        let r0 = c0arr.value(0);
5309        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5310        assert_eq!(r0arr, &a);
5311    }
5312
5313    #[test]
5314    fn test_map_no_value() {
5315        // File schema:
5316        // message schema {
5317        //   required group my_map (MAP) {
5318        //     repeated group key_value {
5319        //       required int32 key;
5320        //       optional int32 value;
5321        //     }
5322        //   }
5323        //   required group my_map_no_v (MAP) {
5324        //     repeated group key_value {
5325        //       required int32 key;
5326        //     }
5327        //   }
5328        //   required group my_list (LIST) {
5329        //     repeated group list {
5330        //       required int32 element;
5331        //     }
5332        //   }
5333        // }
5334        let testdata = arrow::util::test_util::parquet_test_data();
5335        let path = format!("{testdata}/map_no_value.parquet");
5336        let file = File::open(path).unwrap();
5337
5338        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5339            .unwrap()
5340            .build()
5341            .unwrap();
5342        let out = reader.next().unwrap().unwrap();
5343        assert_eq!(out.num_rows(), 3);
5344        assert_eq!(out.num_columns(), 3);
5345        // my_map_no_v and my_list columns should now be equivalent
5346        let c0 = out.column(1).as_list::<i32>();
5347        let c1 = out.column(2).as_list::<i32>();
5348        assert_eq!(c0.len(), c1.len());
5349        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5350    }
5351
5352    #[test]
5353    fn test_read_unknown_logical_type() {
5354        let testdata = arrow::util::test_util::parquet_test_data();
5355        let path = format!("{testdata}/unknown-logical-type.parquet");
5356        let test_file = File::open(path).unwrap();
5357
5358        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5359            .expect("Error creating reader builder");
5360
5361        let schema = builder.metadata().file_metadata().schema_descr();
5362        assert_eq!(
5363            schema.column(0).logical_type_ref(),
5364            Some(&LogicalType::String)
5365        );
5366        assert_eq!(
5367            schema.column(1).logical_type_ref(),
5368            Some(&LogicalType::_Unknown { field_id: 2555 })
5369        );
5370        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5371
5372        let mut reader = builder.build().unwrap();
5373        let out = reader.next().unwrap().unwrap();
5374        assert_eq!(out.num_rows(), 3);
5375        assert_eq!(out.num_columns(), 2);
5376    }
5377
5378    #[test]
5379    fn test_read_row_numbers() {
5380        let file = write_parquet_from_iter(vec![(
5381            "value",
5382            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5383        )]);
5384        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5385
5386        let row_number_field = Arc::new(
5387            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5388        );
5389
5390        let options = ArrowReaderOptions::new()
5391            .with_schema(Arc::new(Schema::new(supplied_fields)))
5392            .with_virtual_columns(vec![row_number_field.clone()])
5393            .unwrap();
5394        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5395            file.try_clone().unwrap(),
5396            options,
5397        )
5398        .expect("reader builder with schema")
5399        .build()
5400        .expect("reader with schema");
5401
5402        let batch = arrow_reader.next().unwrap().unwrap();
5403        let schema = Arc::new(Schema::new(vec![
5404            Field::new("value", ArrowDataType::Int64, false),
5405            (*row_number_field).clone(),
5406        ]));
5407
5408        assert_eq!(batch.schema(), schema);
5409        assert_eq!(batch.num_columns(), 2);
5410        assert_eq!(batch.num_rows(), 3);
5411        assert_eq!(
5412            batch
5413                .column(0)
5414                .as_primitive::<types::Int64Type>()
5415                .iter()
5416                .collect::<Vec<_>>(),
5417            vec![Some(1), Some(2), Some(3)]
5418        );
5419        assert_eq!(
5420            batch
5421                .column(1)
5422                .as_primitive::<types::Int64Type>()
5423                .iter()
5424                .collect::<Vec<_>>(),
5425            vec![Some(0), Some(1), Some(2)]
5426        );
5427    }
5428
5429    #[test]
5430    fn test_read_only_row_numbers() {
5431        let file = write_parquet_from_iter(vec![(
5432            "value",
5433            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5434        )]);
5435        let row_number_field = Arc::new(
5436            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5437        );
5438        let options = ArrowReaderOptions::new()
5439            .with_virtual_columns(vec![row_number_field.clone()])
5440            .unwrap();
5441        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5442        let num_columns = metadata
5443            .metadata
5444            .file_metadata()
5445            .schema_descr()
5446            .num_columns();
5447
5448        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5449            .with_projection(ProjectionMask::none(num_columns))
5450            .build()
5451            .expect("reader with schema");
5452
5453        let batch = arrow_reader.next().unwrap().unwrap();
5454        let schema = Arc::new(Schema::new(vec![row_number_field]));
5455
5456        assert_eq!(batch.schema(), schema);
5457        assert_eq!(batch.num_columns(), 1);
5458        assert_eq!(batch.num_rows(), 3);
5459        assert_eq!(
5460            batch
5461                .column(0)
5462                .as_primitive::<types::Int64Type>()
5463                .iter()
5464                .collect::<Vec<_>>(),
5465            vec![Some(0), Some(1), Some(2)]
5466        );
5467    }
5468
5469    #[test]
5470    fn test_read_row_numbers_row_group_order() -> Result<()> {
5471        // Make a parquet file with 100 rows split across 2 row groups
5472        let array = Int64Array::from_iter_values(5000..5100);
5473        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5474        let mut buffer = Vec::new();
5475        let options = WriterProperties::builder()
5476            .set_max_row_group_row_count(Some(50))
5477            .build();
5478        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5479        // write in 10 row batches as the size limits are enforced after each batch
5480        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5481            writer.write(&batch_chunk)?;
5482        }
5483        writer.close()?;
5484
5485        let row_number_field = Arc::new(
5486            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5487        );
5488
5489        let buffer = Bytes::from(buffer);
5490
5491        let options =
5492            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5493
5494        // read out with normal options
5495        let arrow_reader =
5496            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5497                .build()?;
5498
5499        assert_eq!(
5500            ValuesAndRowNumbers {
5501                values: (5000..5100).collect(),
5502                row_numbers: (0..100).collect()
5503            },
5504            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5505        );
5506
5507        // Now read, out of order row groups
5508        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5509            .with_row_groups(vec![1, 0])
5510            .build()?;
5511
5512        assert_eq!(
5513            ValuesAndRowNumbers {
5514                values: (5050..5100).chain(5000..5050).collect(),
5515                row_numbers: (50..100).chain(0..50).collect(),
5516            },
5517            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5518        );
5519
5520        Ok(())
5521    }
5522
5523    #[derive(Debug, PartialEq)]
5524    struct ValuesAndRowNumbers {
5525        values: Vec<i64>,
5526        row_numbers: Vec<i64>,
5527    }
5528    impl ValuesAndRowNumbers {
5529        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5530            let mut values = vec![];
5531            let mut row_numbers = vec![];
5532            for batch in reader {
5533                let batch = batch.expect("Could not read batch");
5534                values.extend(
5535                    batch
5536                        .column_by_name("col")
5537                        .expect("Could not get col column")
5538                        .as_primitive::<arrow::datatypes::Int64Type>()
5539                        .iter()
5540                        .map(|v| v.expect("Could not get value")),
5541                );
5542
5543                row_numbers.extend(
5544                    batch
5545                        .column_by_name("row_number")
5546                        .expect("Could not get row_number column")
5547                        .as_primitive::<arrow::datatypes::Int64Type>()
5548                        .iter()
5549                        .map(|v| v.expect("Could not get row number"))
5550                        .collect::<Vec<_>>(),
5551                );
5552            }
5553            Self {
5554                values,
5555                row_numbers,
5556            }
5557        }
5558    }
5559
5560    #[test]
5561    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5562        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5563        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5564        assert_eq!(
5565            ArrowReaderOptions::new()
5566                .with_virtual_columns(vec![regular_field])
5567                .unwrap_err()
5568                .to_string(),
5569            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5570        );
5571    }
5572
5573    #[test]
5574    fn test_row_numbers_with_multiple_row_groups() {
5575        test_row_numbers_with_multiple_row_groups_helper(
5576            false,
5577            |path, selection, _row_filter, batch_size| {
5578                let file = File::open(path).unwrap();
5579                let row_number_field = Arc::new(
5580                    Field::new("row_number", ArrowDataType::Int64, false)
5581                        .with_extension_type(RowNumber),
5582                );
5583                let options = ArrowReaderOptions::new()
5584                    .with_virtual_columns(vec![row_number_field])
5585                    .unwrap();
5586                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5587                    .unwrap()
5588                    .with_row_selection(selection)
5589                    .with_batch_size(batch_size)
5590                    .build()
5591                    .expect("Could not create reader");
5592                reader
5593                    .collect::<Result<Vec<_>, _>>()
5594                    .expect("Could not read")
5595            },
5596        );
5597    }
5598
5599    #[test]
5600    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5601        test_row_numbers_with_multiple_row_groups_helper(
5602            true,
5603            |path, selection, row_filter, batch_size| {
5604                let file = File::open(path).unwrap();
5605                let row_number_field = Arc::new(
5606                    Field::new("row_number", ArrowDataType::Int64, false)
5607                        .with_extension_type(RowNumber),
5608                );
5609                let options = ArrowReaderOptions::new()
5610                    .with_virtual_columns(vec![row_number_field])
5611                    .unwrap();
5612                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5613                    .unwrap()
5614                    .with_row_selection(selection)
5615                    .with_batch_size(batch_size)
5616                    .with_row_filter(row_filter.expect("No filter"))
5617                    .build()
5618                    .expect("Could not create reader");
5619                reader
5620                    .collect::<Result<Vec<_>, _>>()
5621                    .expect("Could not read")
5622            },
5623        );
5624    }
5625
5626    #[test]
5627    fn test_read_row_group_indices() {
5628        // create a parquet file with 3 row groups, 2 rows each
5629        let array1 = Int64Array::from(vec![1, 2]);
5630        let array2 = Int64Array::from(vec![3, 4]);
5631        let array3 = Int64Array::from(vec![5, 6]);
5632
5633        let batch1 =
5634            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5635        let batch2 =
5636            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5637        let batch3 =
5638            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5639
5640        let mut buffer = Vec::new();
5641        let options = WriterProperties::builder()
5642            .set_max_row_group_row_count(Some(2))
5643            .build();
5644        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5645        writer.write(&batch1).unwrap();
5646        writer.write(&batch2).unwrap();
5647        writer.write(&batch3).unwrap();
5648        writer.close().unwrap();
5649
5650        let file = Bytes::from(buffer);
5651        let row_group_index_field = Arc::new(
5652            Field::new("row_group_index", ArrowDataType::Int64, false)
5653                .with_extension_type(RowGroupIndex),
5654        );
5655
5656        let options = ArrowReaderOptions::new()
5657            .with_virtual_columns(vec![row_group_index_field.clone()])
5658            .unwrap();
5659        let mut arrow_reader =
5660            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5661                .expect("reader builder with virtual columns")
5662                .build()
5663                .expect("reader with virtual columns");
5664
5665        let batch = arrow_reader.next().unwrap().unwrap();
5666
5667        assert_eq!(batch.num_columns(), 2);
5668        assert_eq!(batch.num_rows(), 6);
5669
5670        assert_eq!(
5671            batch
5672                .column(0)
5673                .as_primitive::<types::Int64Type>()
5674                .iter()
5675                .collect::<Vec<_>>(),
5676            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5677        );
5678
5679        assert_eq!(
5680            batch
5681                .column(1)
5682                .as_primitive::<types::Int64Type>()
5683                .iter()
5684                .collect::<Vec<_>>(),
5685            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5686        );
5687    }
5688
5689    #[test]
5690    fn test_read_only_row_group_indices() {
5691        let array1 = Int64Array::from(vec![1, 2, 3]);
5692        let array2 = Int64Array::from(vec![4, 5]);
5693
5694        let batch1 =
5695            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5696        let batch2 =
5697            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5698
5699        let mut buffer = Vec::new();
5700        let options = WriterProperties::builder()
5701            .set_max_row_group_row_count(Some(3))
5702            .build();
5703        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5704        writer.write(&batch1).unwrap();
5705        writer.write(&batch2).unwrap();
5706        writer.close().unwrap();
5707
5708        let file = Bytes::from(buffer);
5709        let row_group_index_field = Arc::new(
5710            Field::new("row_group_index", ArrowDataType::Int64, false)
5711                .with_extension_type(RowGroupIndex),
5712        );
5713
5714        let options = ArrowReaderOptions::new()
5715            .with_virtual_columns(vec![row_group_index_field.clone()])
5716            .unwrap();
5717        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5718        let num_columns = metadata
5719            .metadata
5720            .file_metadata()
5721            .schema_descr()
5722            .num_columns();
5723
5724        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5725            .with_projection(ProjectionMask::none(num_columns))
5726            .build()
5727            .expect("reader with virtual columns only");
5728
5729        let batch = arrow_reader.next().unwrap().unwrap();
5730        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5731
5732        assert_eq!(batch.schema(), schema);
5733        assert_eq!(batch.num_columns(), 1);
5734        assert_eq!(batch.num_rows(), 5);
5735
5736        assert_eq!(
5737            batch
5738                .column(0)
5739                .as_primitive::<types::Int64Type>()
5740                .iter()
5741                .collect::<Vec<_>>(),
5742            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5743        );
5744    }
5745
5746    #[test]
5747    fn test_read_row_group_indices_with_selection() -> Result<()> {
5748        let mut buffer = Vec::new();
5749        let options = WriterProperties::builder()
5750            .set_max_row_group_row_count(Some(10))
5751            .build();
5752
5753        let schema = Arc::new(Schema::new(vec![Field::new(
5754            "value",
5755            ArrowDataType::Int64,
5756            false,
5757        )]));
5758
5759        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5760
5761        // write out 3 batches of 10 rows each
5762        for i in 0..3 {
5763            let start = i * 10;
5764            let array = Int64Array::from_iter_values(start..start + 10);
5765            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5766            writer.write(&batch)?;
5767        }
5768        writer.close()?;
5769
5770        let file = Bytes::from(buffer);
5771        let row_group_index_field = Arc::new(
5772            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5773        );
5774
5775        let options =
5776            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5777
5778        // test row groups are read in reverse order
5779        let arrow_reader =
5780            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5781                .with_row_groups(vec![2, 1, 0])
5782                .build()?;
5783
5784        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5785        let combined = concat_batches(&batches[0].schema(), &batches)?;
5786
5787        let values = combined.column(0).as_primitive::<types::Int64Type>();
5788        let first_val = values.value(0);
5789        let last_val = values.value(combined.num_rows() - 1);
5790        // first row from rg 2
5791        assert_eq!(first_val, 20);
5792        // the last row from rg 0
5793        assert_eq!(last_val, 9);
5794
5795        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5796        assert_eq!(rg_indices.value(0), 2);
5797        assert_eq!(rg_indices.value(10), 1);
5798        assert_eq!(rg_indices.value(20), 0);
5799
5800        Ok(())
5801    }
5802
5803    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5804        use_filter: bool,
5805        test_case: F,
5806    ) where
5807        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5808    {
5809        let seed: u64 = random();
5810        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5811        let mut rng = StdRng::seed_from_u64(seed);
5812
5813        use tempfile::TempDir;
5814        let tempdir = TempDir::new().expect("Could not create temp dir");
5815
5816        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5817
5818        let path = tempdir.path().join("test.parquet");
5819        std::fs::write(&path, bytes).expect("Could not write file");
5820
5821        let mut case = vec![];
5822        let mut remaining = metadata.file_metadata().num_rows();
5823        while remaining > 0 {
5824            let row_count = rng.random_range(1..=remaining);
5825            remaining -= row_count;
5826            case.push(RowSelector {
5827                row_count: row_count as usize,
5828                skip: rng.random_bool(0.5),
5829            });
5830        }
5831
5832        let filter = use_filter.then(|| {
5833            let filter = (0..metadata.file_metadata().num_rows())
5834                .map(|_| rng.random_bool(0.99))
5835                .collect::<Vec<_>>();
5836            let mut filter_offset = 0;
5837            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5838                ProjectionMask::all(),
5839                move |b| {
5840                    let array = BooleanArray::from_iter(
5841                        filter
5842                            .iter()
5843                            .skip(filter_offset)
5844                            .take(b.num_rows())
5845                            .map(|x| Some(*x)),
5846                    );
5847                    filter_offset += b.num_rows();
5848                    Ok(array)
5849                },
5850            ))])
5851        });
5852
5853        let selection = RowSelection::from(case);
5854        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5855
5856        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5857            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5858            return;
5859        }
5860        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5861            .expect("Failed to concatenate");
5862        // assert_eq!(selection.row_count(), actual.num_rows());
5863        let values = actual
5864            .column(0)
5865            .as_primitive::<types::Int64Type>()
5866            .iter()
5867            .collect::<Vec<_>>();
5868        let row_numbers = actual
5869            .column(1)
5870            .as_primitive::<types::Int64Type>()
5871            .iter()
5872            .collect::<Vec<_>>();
5873        assert_eq!(
5874            row_numbers
5875                .into_iter()
5876                .map(|number| number.map(|number| number + 1))
5877                .collect::<Vec<_>>(),
5878            values
5879        );
5880    }
5881
5882    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5883        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5884            "value",
5885            ArrowDataType::Int64,
5886            false,
5887        )])));
5888
5889        let mut buf = Vec::with_capacity(1024);
5890        let mut writer =
5891            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5892
5893        let mut values = 1..=rng.random_range(1..4096);
5894        while !values.is_empty() {
5895            let batch_values = values
5896                .by_ref()
5897                .take(rng.random_range(1..4096))
5898                .collect::<Vec<_>>();
5899            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5900            let batch =
5901                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5902            writer.write(&batch).expect("Could not write batch");
5903            writer.flush().expect("Could not flush");
5904        }
5905        let metadata = writer.close().expect("Could not close writer");
5906
5907        (Bytes::from(buf), metadata)
5908    }
5909}