Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Default batch size for reading parquet files
61pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63/// Builder for constructing Parquet readers that decode into [Apache Arrow]
64/// arrays.
65///
66/// Most users should use one of the following specializations:
67///
68/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
69/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
70/// * decoder API: [`ParquetPushDecoderBuilder`]
71///
72/// # Features
73/// * Projection pushdown: [`Self::with_projection`]
74/// * Cached metadata: [`ArrowReaderMetadata::load`]
75/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
76/// * Row group filtering: [`Self::with_row_groups`]
77/// * Range filtering: [`Self::with_row_selection`]
78/// * Row level filtering: [`Self::with_row_filter`]
79///
80/// # Implementing Predicate Pushdown
81///
82/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
83/// process, which is efficient and allows the most low level optimizations.
84///
85/// However, most Parquet based systems will apply filters at many steps prior
86/// to decoding such as pruning files, row groups and data pages. This crate
87/// provides the low level APIs needed to implement such filtering, but does not
88/// include any logic to actually evaluate predicates. For example:
89///
90/// * [`Self::with_row_groups`] for Row Group pruning
91/// * [`Self::with_row_selection`] for data page pruning
92/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
93///
94/// The rationale for this design is that implementing predicate pushdown is a
95/// complex topic and varies significantly from system to system. For example
96///
97/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
98/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
99/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
100/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
101///
102/// You can read more about this design in the [Querying Parquet with
103/// Millisecond Latency] Arrow blog post.
104///
105/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
106/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
107/// [Apache Arrow]: https://arrow.apache.org/
108/// [`StatisticsConverter`]: statistics::StatisticsConverter
109/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
110pub struct ArrowReaderBuilder<T> {
111    /// The "input" to read parquet data from.
112    ///
113    /// Note in the case of the [`ParquetPushDecoderBuilder`] there is no
114    /// underlying reader; the input is instead [`PushDecoderInput`], the buffer that
115    /// caller-pushed bytes accumulate in.
116    ///
117    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
118    /// [`PushDecoderInput`]: crate::arrow::push_decoder::PushDecoderInput
119    pub(crate) input: T,
120
121    pub(crate) metadata: Arc<ParquetMetaData>,
122
123    pub(crate) schema: SchemaRef,
124
125    pub(crate) fields: Option<Arc<ParquetField>>,
126
127    pub(crate) batch_size: usize,
128
129    pub(crate) row_groups: Option<Vec<usize>>,
130
131    pub(crate) projection: ProjectionMask,
132
133    pub(crate) filter: Option<RowFilter>,
134
135    pub(crate) selection: Option<RowSelection>,
136
137    pub(crate) row_selection_policy: RowSelectionPolicy,
138
139    pub(crate) limit: Option<usize>,
140
141    pub(crate) offset: Option<usize>,
142
143    pub(crate) metrics: ArrowReaderMetrics,
144
145    pub(crate) max_predicate_cache_size: usize,
146}
147
148impl<T: Debug> Debug for ArrowReaderBuilder<T> {
149    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("ArrowReaderBuilder<T>")
151            .field("input", &self.input)
152            .field("metadata", &self.metadata)
153            .field("schema", &self.schema)
154            .field("fields", &self.fields)
155            .field("batch_size", &self.batch_size)
156            .field("row_groups", &self.row_groups)
157            .field("projection", &self.projection)
158            .field("filter", &self.filter)
159            .field("selection", &self.selection)
160            .field("row_selection_policy", &self.row_selection_policy)
161            .field("limit", &self.limit)
162            .field("offset", &self.offset)
163            .field("metrics", &self.metrics)
164            .finish()
165    }
166}
167
168impl<T> ArrowReaderBuilder<T> {
169    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
170        Self {
171            input,
172            metadata: metadata.metadata,
173            schema: metadata.schema,
174            fields: metadata.fields,
175            batch_size: DEFAULT_BATCH_SIZE,
176            row_groups: None,
177            projection: ProjectionMask::all(),
178            filter: None,
179            selection: None,
180            row_selection_policy: RowSelectionPolicy::default(),
181            limit: None,
182            offset: None,
183            metrics: ArrowReaderMetrics::Disabled,
184            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
185        }
186    }
187
188    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
189    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
190        &self.metadata
191    }
192
193    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
194    pub fn parquet_schema(&self) -> &SchemaDescriptor {
195        self.metadata.file_metadata().schema_descr()
196    }
197
198    /// Returns the arrow [`SchemaRef`] for this parquet file
199    pub fn schema(&self) -> &SchemaRef {
200        &self.schema
201    }
202
203    /// Set the size of [`RecordBatch`] to produce. Defaults to [`DEFAULT_BATCH_SIZE`].
204    ///
205    /// This may be used as a hint for internal allocations, but does not
206    /// guarantee exact internal buffer capacities.
207    ///
208    /// If `batch_size` is more than the file row count, use the file row count.
209    pub fn with_batch_size(self, batch_size: usize) -> Self {
210        // Try to avoid allocate large buffer
211        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
212        Self { batch_size, ..self }
213    }
214
215    /// Only read data from the provided row group indexes
216    ///
217    /// This is also called row group filtering
218    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
219        Self {
220            row_groups: Some(row_groups),
221            ..self
222        }
223    }
224
225    /// Only read data from the provided column indexes
226    pub fn with_projection(self, mask: ProjectionMask) -> Self {
227        Self {
228            projection: mask,
229            ..self
230        }
231    }
232
233    /// Configure how row selections should be materialised during execution
234    ///
235    /// See [`RowSelectionPolicy`] for more details
236    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
237        Self {
238            row_selection_policy: policy,
239            ..self
240        }
241    }
242
243    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
244    /// data into memory.
245    ///
246    /// This feature is used to restrict which rows are decoded within row
247    /// groups, skipping ranges of rows that are not needed. Such selections
248    /// could be determined by evaluating predicates against the parquet page
249    /// [`Index`] or some other external information available to a query
250    /// engine.
251    ///
252    /// # Notes
253    ///
254    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
255    /// applying the row selection, and therefore rows from skipped row groups
256    /// should not be included in the [`RowSelection`] (see example below)
257    ///
258    /// It is recommended to enable writing the page index if using this
259    /// functionality, to allow more efficient skipping over data pages. See
260    /// [`ArrowReaderOptions::with_page_index`].
261    ///
262    /// # Example
263    ///
264    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
265    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
266    /// row group 3:
267    ///
268    /// ```text
269    ///   Row Group 0, 1000 rows (selected)
270    ///   Row Group 1, 1000 rows (skipped)
271    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
272    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
273    /// ```
274    ///
275    /// You could pass the following [`RowSelection`]:
276    ///
277    /// ```text
278    ///  Select 1000    (scan all rows in row group 0)
279    ///  Skip 50        (skip the first 50 rows in row group 2)
280    ///  Select 50      (scan rows 50-100 in row group 2)
281    ///  Skip 900       (skip the remaining rows in row group 2)
282    ///  Skip 200       (skip the first 200 rows in row group 3)
283    ///  Select 100     (scan rows 200-300 in row group 3)
284    ///  Skip 700       (skip the remaining rows in row group 3)
285    /// ```
286    /// Note there is no entry for the (entirely) skipped row group 1.
287    ///
288    /// Note you can represent the same selection with fewer entries. Instead of
289    ///
290    /// ```text
291    ///  Skip 900       (skip the remaining rows in row group 2)
292    ///  Skip 200       (skip the first 200 rows in row group 3)
293    /// ```
294    ///
295    /// you could use
296    ///
297    /// ```text
298    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
299    /// ```
300    ///
301    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
302    pub fn with_row_selection(self, selection: RowSelection) -> Self {
303        Self {
304            selection: Some(selection),
305            ..self
306        }
307    }
308
309    /// Provide a [`RowFilter`] to skip decoding rows
310    ///
311    /// Row filters are applied after row group selection and row selection
312    ///
313    /// It is recommended to enable reading the page index if using this functionality, to allow
314    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
315    ///
316    /// See the [blog post on late materialization] for a more technical explanation.
317    ///
318    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
319    ///
320    /// # Example
321    /// ```rust
322    /// # use std::fs::File;
323    /// # use arrow_array::Int32Array;
324    /// # use parquet::arrow::ProjectionMask;
325    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
326    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
327    /// # let testdata = arrow::util::test_util::parquet_test_data();
328    /// # let path = format!("{testdata}/alltypes_plain.parquet");
329    /// # let file = File::open(&path)?;
330    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
331    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
332    /// // Create predicate that evaluates `int_col != 1`.
333    /// // `int_col` column has index 4 (zero based) in the schema
334    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
335    /// // Only the projection columns are passed to the predicate so
336    /// // int_col is column 0 in the predicate
337    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
338    ///     let int_col = batch.column(0);
339    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
340    /// });
341    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
342    /// // The filter will be invoked during the reading process
343    /// let reader = builder.with_row_filter(row_filter).build()?;
344    /// # for b in reader { let _ = b?; }
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub fn with_row_filter(self, filter: RowFilter) -> Self {
349        Self {
350            filter: Some(filter),
351            ..self
352        }
353    }
354
355    /// Provide a limit to the number of rows to be read
356    ///
357    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
358    /// allowing it to limit the final set of rows decoded after any pushed down predicates
359    ///
360    /// It is recommended to enable reading the page index if using this functionality, to allow
361    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
362    pub fn with_limit(self, limit: usize) -> Self {
363        Self {
364            limit: Some(limit),
365            ..self
366        }
367    }
368
369    /// Provide an offset to skip over the given number of rows
370    ///
371    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
372    /// allowing it to skip rows after any pushed down predicates
373    ///
374    /// It is recommended to enable reading the page index if using this functionality, to allow
375    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
376    pub fn with_offset(self, offset: usize) -> Self {
377        Self {
378            offset: Some(offset),
379            ..self
380        }
381    }
382
383    /// Specify metrics collection during reading
384    ///
385    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
386    /// clone of the provided metrics to the builder.
387    ///
388    /// For example:
389    ///
390    /// ```rust
391    /// # use std::sync::Arc;
392    /// # use bytes::Bytes;
393    /// # use arrow_array::{Int32Array, RecordBatch};
394    /// # use arrow_schema::{DataType, Field, Schema};
395    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
396    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
397    /// # use parquet::arrow::ArrowWriter;
398    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
399    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
400    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
401    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
402    /// # writer.write(&batch).unwrap();
403    /// # writer.close().unwrap();
404    /// # let file = Bytes::from(file);
405    /// // Create metrics object to pass into the reader
406    /// let metrics = ArrowReaderMetrics::enabled();
407    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
408    ///   // Configure the builder to use the metrics by passing a clone
409    ///   .with_metrics(metrics.clone())
410    ///   // Build the reader
411    ///   .build().unwrap();
412    /// // .. read data from the reader ..
413    ///
414    /// // check the metrics
415    /// assert!(metrics.records_read_from_inner().is_some());
416    /// ```
417    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
418        Self { metrics, ..self }
419    }
420
421    /// Set the maximum size (per row group) of the predicate cache in bytes for
422    /// the async decoder.
423    ///
424    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
425    /// unlimited cache size.
426    ///
427    /// This cache is used to store decoded arrays that are used in
428    /// predicate evaluation ([`Self::with_row_filter`]).
429    ///
430    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
431    /// [this ticket] for more details and alternatives.
432    ///
433    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
434    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
435    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
436        Self {
437            max_predicate_cache_size,
438            ..self
439        }
440    }
441}
442
443/// Options that control how [`ParquetMetaData`] is read when constructing
444/// an Arrow reader.
445///
446/// To use these options, pass them to one of the following methods:
447/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
448/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
449///
450/// For fine-grained control over metadata loading, use
451/// [`ArrowReaderMetadata::load`] to load metadata with these options,
452///
453/// See [`ArrowReaderBuilder`] for how to configure how the column data
454/// is then read from the file, including projection and filter pushdown
455///
456/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
457#[derive(Debug, Clone, Default)]
458pub struct ArrowReaderOptions {
459    /// Should the reader strip any user defined metadata from the Arrow schema
460    skip_arrow_metadata: bool,
461    /// If provided, used as the schema hint when determining the Arrow schema,
462    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
463    ///
464    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
465    supplied_schema: Option<SchemaRef>,
466
467    pub(crate) column_index: PageIndexPolicy,
468    pub(crate) offset_index: PageIndexPolicy,
469
470    /// Options to control reading of Parquet metadata
471    metadata_options: ParquetMetaDataOptions,
472    /// If encryption is enabled, the file decryption properties can be provided
473    #[cfg(feature = "encryption")]
474    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
475
476    virtual_columns: Vec<FieldRef>,
477}
478
479impl ArrowReaderOptions {
480    /// Create a new [`ArrowReaderOptions`] with the default settings
481    pub fn new() -> Self {
482        Self::default()
483    }
484
485    /// Skip decoding the embedded arrow metadata (defaults to `false`)
486    ///
487    /// Parquet files generated by some writers may contain embedded arrow
488    /// schema and metadata.
489    /// This may not be correct or compatible with your system,
490    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
491    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
492        Self {
493            skip_arrow_metadata,
494            ..self
495        }
496    }
497
498    /// Provide a schema hint to use when reading the Parquet file.
499    ///
500    /// If provided, this schema takes precedence over any arrow schema embedded
501    /// in the metadata (see the [`arrow`] documentation for more details).
502    ///
503    /// If the provided schema is not compatible with the data stored in the
504    /// parquet file schema, an error will be returned when constructing the
505    /// builder.
506    ///
507    /// This option is only required if you want to explicitly control the
508    /// conversion of Parquet types to Arrow types, such as casting a column to
509    /// a different type. For example, if you wanted to read an Int64 in
510    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
511    ///
512    /// [`arrow`]: crate::arrow
513    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
514    ///
515    /// # Notes
516    ///
517    /// The provided schema must have the same number of columns as the parquet schema and
518    /// the column names must be the same.
519    ///
520    /// # Example
521    /// ```
522    /// # use std::sync::Arc;
523    /// # use bytes::Bytes;
524    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
525    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
526    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
527    /// # use parquet::arrow::ArrowWriter;
528    /// // Write data - schema is inferred from the data to be Int32
529    /// let mut file = Vec::new();
530    /// let batch = RecordBatch::try_from_iter(vec![
531    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
532    /// ]).unwrap();
533    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
534    /// writer.write(&batch).unwrap();
535    /// writer.close().unwrap();
536    /// let file = Bytes::from(file);
537    ///
538    /// // Read the file back.
539    /// // Supply a schema that interprets the Int32 column as a Timestamp.
540    /// let supplied_schema = Arc::new(Schema::new(vec![
541    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
542    /// ]));
543    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
544    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
545    ///     file.clone(),
546    ///     options
547    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
548    ///
549    /// // Create the reader and read the data using the supplied schema.
550    /// let mut reader = builder.build().unwrap();
551    /// let _batch = reader.next().unwrap().unwrap();
552    /// ```
553    ///
554    /// # Example: Preserving Dictionary Encoding
555    ///
556    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
557    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
558    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
559    ///
560    /// ```
561    /// # use std::sync::Arc;
562    /// # use bytes::Bytes;
563    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
564    /// # use arrow_schema::{DataType, Field, Schema};
565    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
566    /// # use parquet::arrow::ArrowWriter;
567    /// // Write a Parquet file with string data
568    /// let mut file = Vec::new();
569    /// let schema = Arc::new(Schema::new(vec![
570    ///     Field::new("city", DataType::Utf8, false)
571    /// ]));
572    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
573    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
574    ///
575    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
576    /// writer.write(&batch).unwrap();
577    /// writer.close().unwrap();
578    /// let file = Bytes::from(file);
579    ///
580    /// // Read the file back, requesting dictionary encoding preservation
581    /// let dict_schema = Arc::new(Schema::new(vec![
582    ///     Field::new("city", DataType::Dictionary(
583    ///         Box::new(DataType::Int32),
584    ///         Box::new(DataType::Utf8)
585    ///     ), false)
586    /// ]));
587    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
588    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
589    ///     file.clone(),
590    ///     options
591    /// ).unwrap();
592    ///
593    /// let mut reader = builder.build().unwrap();
594    /// let batch = reader.next().unwrap().unwrap();
595    ///
596    /// // The column is now a DictionaryArray
597    /// assert!(matches!(
598    ///     batch.column(0).data_type(),
599    ///     DataType::Dictionary(_, _)
600    /// ));
601    /// ```
602    ///
603    /// **Note**: Dictionary encoding preservation works best when:
604    /// 1. The original column was dictionary encoded (the default for string columns)
605    /// 2. There are a small number of distinct values
606    pub fn with_schema(self, schema: SchemaRef) -> Self {
607        Self {
608            supplied_schema: Some(schema),
609            skip_arrow_metadata: true,
610            ..self
611        }
612    }
613
614    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
615    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
616    ///
617    /// The `PageIndex` can be used to push down predicates to the parquet scan,
618    /// potentially eliminating unnecessary IO, by some query engines.
619    ///
620    /// If this is enabled, [`ParquetMetaData::column_index`] and
621    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
622    /// information is present in the file.
623    ///
624    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
625    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
626    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
627    pub fn with_page_index(self, page_index: bool) -> Self {
628        self.with_page_index_policy(PageIndexPolicy::from(page_index))
629    }
630
631    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
632    ///
633    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
634    /// This method sets the same policy for both. For fine-grained control, use
635    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
636    ///
637    /// See [`Self::with_page_index`] for more details on page indexes.
638    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
639        self.with_column_index_policy(policy)
640            .with_offset_index_policy(policy)
641    }
642
643    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
644    ///
645    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
646    /// for predicate pushdown and page-level pruning.
647    ///
648    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
649    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
650        self.column_index = policy;
651        self
652    }
653
654    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
655    ///
656    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
657    /// efficient page-level skipping and random access within column chunks.
658    ///
659    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
660    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
661        self.offset_index = policy;
662        self
663    }
664
665    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
666    /// footer will be skipped.
667    ///
668    /// This can be used to avoid reparsing the schema from the file when it is
669    /// already known.
670    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
671        self.metadata_options.set_schema(schema);
672        self
673    }
674
675    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
676    /// (defaults to `false`).
677    ///
678    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
679    /// might be desirable.
680    ///
681    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
682    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
683    /// [`encoding_stats`]:
684    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
685    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
686        self.metadata_options.set_encoding_stats_as_mask(val);
687        self
688    }
689
690    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
691    ///
692    /// [`encoding_stats`]:
693    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
694    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
695        self.metadata_options.set_encoding_stats_policy(policy);
696        self
697    }
698
699    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
700    ///
701    /// [`statistics`]:
702    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
703    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
704        self.metadata_options.set_column_stats_policy(policy);
705        self
706    }
707
708    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
709    ///
710    /// [`size_statistics`]:
711    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
712    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
713        self.metadata_options.set_size_stats_policy(policy);
714        self
715    }
716
717    /// Provide the file decryption properties to use when reading encrypted parquet files.
718    ///
719    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
720    #[cfg(feature = "encryption")]
721    pub fn with_file_decryption_properties(
722        self,
723        file_decryption_properties: Arc<FileDecryptionProperties>,
724    ) -> Self {
725        Self {
726            file_decryption_properties: Some(file_decryption_properties),
727            ..self
728        }
729    }
730
731    /// Include virtual columns in the output.
732    ///
733    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
734    ///
735    /// # Example
736    /// ```
737    /// # use std::sync::Arc;
738    /// # use bytes::Bytes;
739    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
740    /// # use arrow_schema::{DataType, Field, Schema};
741    /// # use parquet::arrow::{ArrowWriter, RowNumber};
742    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
743    /// #
744    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
745    /// // Create a simple record batch with some data
746    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
747    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
748    ///
749    /// // Write the batch to an in-memory buffer
750    /// let mut file = Vec::new();
751    /// let mut writer = ArrowWriter::try_new(
752    ///     &mut file,
753    ///     batch.schema(),
754    ///     None
755    /// )?;
756    /// writer.write(&batch)?;
757    /// writer.close()?;
758    /// let file = Bytes::from(file);
759    ///
760    /// // Create a virtual column for row numbers
761    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
762    ///     .with_extension_type(RowNumber));
763    ///
764    /// // Configure options with virtual columns
765    /// let options = ArrowReaderOptions::new()
766    ///     .with_virtual_columns(vec![row_number_field])?;
767    ///
768    /// // Create a reader with the options
769    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
770    ///     file,
771    ///     options
772    /// )?
773    /// .build()?;
774    ///
775    /// // Read the batch - it will include both the original column and the virtual row_number column
776    /// let result_batch = reader.next().unwrap()?;
777    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
778    /// assert_eq!(result_batch.num_rows(), 3);
779    /// #
780    /// # Ok(())
781    /// # }
782    /// ```
783    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
784        // Validate that all fields are virtual columns
785        for field in &virtual_columns {
786            if !is_virtual_column(field) {
787                return Err(ParquetError::General(format!(
788                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
789                    field.name()
790                )));
791            }
792        }
793        Ok(Self {
794            virtual_columns,
795            ..self
796        })
797    }
798
799    #[deprecated(
800        since = "57.2.0",
801        note = "Use `column_index_policy` or `offset_index_policy` instead"
802    )]
803    /// Returns whether page index reading is enabled.
804    ///
805    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
806    ///
807    /// This can be set via [`with_page_index`][Self::with_page_index] or
808    /// [`with_page_index_policy`][Self::with_page_index_policy].
809    pub fn page_index(&self) -> bool {
810        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
811    }
812
813    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
814    ///
815    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
816    /// or [`with_page_index_policy`][Self::with_page_index_policy].
817    pub fn offset_index_policy(&self) -> PageIndexPolicy {
818        self.offset_index
819    }
820
821    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
822    ///
823    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
824    /// or [`with_page_index_policy`][Self::with_page_index_policy].
825    pub fn column_index_policy(&self) -> PageIndexPolicy {
826        self.column_index
827    }
828
829    /// Retrieve the currently set metadata decoding options.
830    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
831        &self.metadata_options
832    }
833
834    /// Retrieve the currently set file decryption properties.
835    ///
836    /// This can be set via
837    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
838    #[cfg(feature = "encryption")]
839    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
840        self.file_decryption_properties.as_ref()
841    }
842}
843
844/// The metadata necessary to construct a [`ArrowReaderBuilder`]
845///
846/// Note this structure is cheaply clone-able as it consists of several arcs.
847///
848/// This structure allows
849///
850/// 1. Loading metadata for a file once and then using that same metadata to
851///    construct multiple separate readers, for example, to distribute readers
852///    across multiple threads
853///
854/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
855///    from the file each time a reader is constructed.
856///
857/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
858#[derive(Debug, Clone)]
859pub struct ArrowReaderMetadata {
860    /// The Parquet Metadata, if known aprior
861    pub(crate) metadata: Arc<ParquetMetaData>,
862    /// The Arrow Schema
863    pub(crate) schema: SchemaRef,
864    /// The Parquet schema (root field)
865    pub(crate) fields: Option<Arc<ParquetField>>,
866}
867
868impl ArrowReaderMetadata {
869    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
870    /// and [`ChunkReader`]
871    ///
872    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
873    /// example of how this can be used
874    ///
875    /// # Notes
876    ///
877    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
878    /// `Self::metadata` is missing the page index, this function will attempt
879    /// to load the page index by making an object store request.
880    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
881        let metadata = ParquetMetaDataReader::new()
882            .with_column_index_policy(options.column_index)
883            .with_offset_index_policy(options.offset_index)
884            .with_metadata_options(Some(options.metadata_options.clone()));
885        #[cfg(feature = "encryption")]
886        let metadata = metadata.with_decryption_properties(
887            options.file_decryption_properties.as_ref().map(Arc::clone),
888        );
889        let metadata = metadata.parse_and_finish(reader)?;
890        Self::try_new(Arc::new(metadata), options)
891    }
892
893    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
894    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
895    ///
896    /// # Notes
897    ///
898    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
899    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
900    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
901        match options.supplied_schema {
902            Some(supplied_schema) => Self::with_supplied_schema(
903                metadata,
904                supplied_schema.clone(),
905                &options.virtual_columns,
906            ),
907            None => {
908                let kv_metadata = match options.skip_arrow_metadata {
909                    true => None,
910                    false => metadata.file_metadata().key_value_metadata(),
911                };
912
913                let (schema, fields) = parquet_to_arrow_schema_and_fields(
914                    metadata.file_metadata().schema_descr(),
915                    ProjectionMask::all(),
916                    kv_metadata,
917                    &options.virtual_columns,
918                )?;
919
920                Ok(Self {
921                    metadata,
922                    schema: Arc::new(schema),
923                    fields: fields.map(Arc::new),
924                })
925            }
926        }
927    }
928
929    fn with_supplied_schema(
930        metadata: Arc<ParquetMetaData>,
931        supplied_schema: SchemaRef,
932        virtual_columns: &[FieldRef],
933    ) -> Result<Self> {
934        let parquet_schema = metadata.file_metadata().schema_descr();
935        let field_levels = parquet_to_arrow_field_levels_with_virtual(
936            parquet_schema,
937            ProjectionMask::all(),
938            Some(supplied_schema.fields()),
939            virtual_columns,
940        )?;
941        let fields = field_levels.fields;
942        let inferred_len = fields.len();
943        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
944        // Ensure the supplied schema has the same number of columns as the parquet schema.
945        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
946        // different lengths, but we check here to be safe.
947        if inferred_len != supplied_len {
948            return Err(arrow_err!(format!(
949                "Incompatible supplied Arrow schema: expected {} columns received {}",
950                inferred_len, supplied_len
951            )));
952        }
953
954        let mut errors = Vec::new();
955
956        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
957
958        for (field1, field2) in field_iter {
959            if field1.data_type() != field2.data_type() {
960                errors.push(format!(
961                    "data type mismatch for field {}: requested {} but found {}",
962                    field1.name(),
963                    field1.data_type(),
964                    field2.data_type()
965                ));
966            }
967            if field1.is_nullable() != field2.is_nullable() {
968                errors.push(format!(
969                    "nullability mismatch for field {}: expected {:?} but found {:?}",
970                    field1.name(),
971                    field1.is_nullable(),
972                    field2.is_nullable()
973                ));
974            }
975            if field1.metadata() != field2.metadata() {
976                errors.push(format!(
977                    "metadata mismatch for field {}: expected {:?} but found {:?}",
978                    field1.name(),
979                    field1.metadata(),
980                    field2.metadata()
981                ));
982            }
983        }
984
985        if !errors.is_empty() {
986            let message = errors.join(", ");
987            return Err(ParquetError::ArrowError(format!(
988                "Incompatible supplied Arrow schema: {message}",
989            )));
990        }
991
992        Ok(Self {
993            metadata,
994            schema: supplied_schema,
995            fields: field_levels.levels.map(Arc::new),
996        })
997    }
998
999    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
1000    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
1001        &self.metadata
1002    }
1003
1004    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
1005    pub fn parquet_schema(&self) -> &SchemaDescriptor {
1006        self.metadata.file_metadata().schema_descr()
1007    }
1008
1009    /// Returns the arrow [`SchemaRef`] for this parquet file
1010    pub fn schema(&self) -> &SchemaRef {
1011        &self.schema
1012    }
1013}
1014
1015#[doc(hidden)]
1016// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1017pub struct SyncReader<T: ChunkReader>(T);
1018
1019impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1020    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1021        f.debug_tuple("SyncReader").field(&self.0).finish()
1022    }
1023}
1024
1025/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1026///
1027/// # See Also
1028/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1029/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1030/// * [`ArrowReaderBuilder`] for additional member functions
1031pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1032
1033impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1034    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1035    ///
1036    /// ```
1037    /// # use std::sync::Arc;
1038    /// # use bytes::Bytes;
1039    /// # use arrow_array::{Int32Array, RecordBatch};
1040    /// # use arrow_schema::{DataType, Field, Schema};
1041    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1042    /// # use parquet::arrow::ArrowWriter;
1043    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1044    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1045    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1046    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1047    /// # writer.write(&batch).unwrap();
1048    /// # writer.close().unwrap();
1049    /// # let file = Bytes::from(file);
1050    /// // Build the reader from anything that implements `ChunkReader`
1051    /// // such as a `File`, or `Bytes`
1052    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1053    /// // The builder has access to ParquetMetaData such
1054    /// // as the number and layout of row groups
1055    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1056    /// // Call build to create the reader
1057    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1058    /// // Read data
1059    /// while let Some(batch) = reader.next().transpose()? {
1060    ///     println!("Read {} rows", batch.num_rows());
1061    /// }
1062    /// # Ok::<(), parquet::errors::ParquetError>(())
1063    /// ```
1064    pub fn try_new(reader: T) -> Result<Self> {
1065        Self::try_new_with_options(reader, Default::default())
1066    }
1067
1068    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1069    ///
1070    /// Use this method if you want to control the options for reading the
1071    /// [`ParquetMetaData`]
1072    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1073        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1074        Ok(Self::new_with_metadata(reader, metadata))
1075    }
1076
1077    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1078    ///
1079    /// Use this method if you already have [`ParquetMetaData`] for a file.
1080    /// This interface allows:
1081    ///
1082    /// 1. Loading metadata once and using it to create multiple builders with
1083    ///    potentially different settings or run on different threads
1084    ///
1085    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1086    ///    file each time a reader is constructed.
1087    ///
1088    /// See the docs on [`ArrowReaderMetadata`] for more details
1089    ///
1090    /// # Example
1091    /// ```
1092    /// # use std::fs::metadata;
1093    /// # use std::sync::Arc;
1094    /// # use bytes::Bytes;
1095    /// # use arrow_array::{Int32Array, RecordBatch};
1096    /// # use arrow_schema::{DataType, Field, Schema};
1097    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1098    /// # use parquet::arrow::ArrowWriter;
1099    /// #
1100    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1101    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1102    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1103    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1104    /// # writer.write(&batch).unwrap();
1105    /// # writer.close().unwrap();
1106    /// # let file = Bytes::from(file);
1107    /// #
1108    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1109    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1110    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1111    ///
1112    /// // Should be able to read from both in parallel
1113    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1114    /// ```
1115    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1116        Self::new_builder(SyncReader(input), metadata)
1117    }
1118
1119    /// Read bloom filter for a column in a row group
1120    ///
1121    /// Returns `None` if the column does not have a bloom filter
1122    ///
1123    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1124    pub fn get_row_group_column_bloom_filter(
1125        &self,
1126        row_group_idx: usize,
1127        column_idx: usize,
1128    ) -> Result<Option<Sbbf>> {
1129        let metadata = self.metadata.row_group(row_group_idx);
1130        let column_metadata = metadata.column(column_idx);
1131
1132        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1133            offset
1134                .try_into()
1135                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1136        } else {
1137            return Ok(None);
1138        };
1139
1140        let buffer = match column_metadata.bloom_filter_length() {
1141            Some(length) => self.input.0.get_bytes(offset, length as usize),
1142            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1143        }?;
1144
1145        let (header, bitset_offset) =
1146            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1147
1148        match header.algorithm {
1149            BloomFilterAlgorithm::BLOCK => {
1150                // this match exists to future proof the singleton algorithm enum
1151            }
1152        }
1153        match header.compression {
1154            BloomFilterCompression::UNCOMPRESSED => {
1155                // this match exists to future proof the singleton compression enum
1156            }
1157        }
1158        match header.hash {
1159            BloomFilterHash::XXHASH => {
1160                // this match exists to future proof the singleton hash enum
1161            }
1162        }
1163
1164        let bitset = match column_metadata.bloom_filter_length() {
1165            Some(_) => buffer.slice(
1166                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1167                    - TryInto::<usize>::try_into(offset).unwrap())..,
1168            ),
1169            None => {
1170                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1171                    ParquetError::General("Bloom filter length is invalid".to_string())
1172                })?;
1173                self.input.0.get_bytes(bitset_offset, bitset_length)?
1174            }
1175        };
1176        Ok(Some(Sbbf::new(&bitset)))
1177    }
1178
1179    /// Build a [`ParquetRecordBatchReader`]
1180    ///
1181    /// Note: this will eagerly evaluate any `RowFilter` before returning
1182    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1183        let Self {
1184            input,
1185            metadata,
1186            schema: _,
1187            fields,
1188            batch_size,
1189            row_groups,
1190            projection,
1191            mut filter,
1192            selection,
1193            row_selection_policy,
1194            limit,
1195            offset,
1196            metrics,
1197            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1198            max_predicate_cache_size: _,
1199        } = self;
1200
1201        // Try to avoid allocate large buffer
1202        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1203
1204        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1205
1206        let reader = ReaderRowGroups {
1207            reader: Arc::new(input.0),
1208            metadata,
1209            row_groups,
1210        };
1211
1212        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1213            .with_selection(selection)
1214            .with_row_selection_policy(row_selection_policy);
1215
1216        // Update selection based on any filters
1217        if let Some(filter) = filter.as_mut() {
1218            for predicate in filter.predicates.iter_mut() {
1219                // break early if we have ruled out all rows
1220                if !plan_builder.selects_any() {
1221                    break;
1222                }
1223
1224                let mut cache_projection = predicate.projection().clone();
1225                cache_projection.intersect(&projection);
1226
1227                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1228                    .with_batch_size(batch_size)
1229                    .with_parquet_metadata(&reader.metadata)
1230                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1231
1232                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1233            }
1234        }
1235
1236        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1237            .with_batch_size(batch_size)
1238            .with_parquet_metadata(&reader.metadata)
1239            .build_array_reader(fields.as_deref(), &projection)?;
1240
1241        let read_plan = plan_builder
1242            .limited(reader.num_rows())
1243            .with_offset(offset)
1244            .with_limit(limit)
1245            .build_limited()
1246            .build();
1247
1248        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1249    }
1250}
1251
1252struct ReaderRowGroups<T: ChunkReader> {
1253    reader: Arc<T>,
1254
1255    metadata: Arc<ParquetMetaData>,
1256    /// Optional list of row group indices to scan
1257    row_groups: Vec<usize>,
1258}
1259
1260impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1261    fn num_rows(&self) -> usize {
1262        let meta = self.metadata.row_groups();
1263        self.row_groups
1264            .iter()
1265            .map(|x| meta[*x].num_rows() as usize)
1266            .sum()
1267    }
1268
1269    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1270        Ok(Box::new(ReaderPageIterator {
1271            column_idx: i,
1272            reader: self.reader.clone(),
1273            metadata: self.metadata.clone(),
1274            row_groups: self.row_groups.clone().into_iter(),
1275        }))
1276    }
1277
1278    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1279        Box::new(
1280            self.row_groups
1281                .iter()
1282                .map(move |i| self.metadata.row_group(*i)),
1283        )
1284    }
1285
1286    fn metadata(&self) -> &ParquetMetaData {
1287        self.metadata.as_ref()
1288    }
1289}
1290
1291struct ReaderPageIterator<T: ChunkReader> {
1292    reader: Arc<T>,
1293    column_idx: usize,
1294    row_groups: std::vec::IntoIter<usize>,
1295    metadata: Arc<ParquetMetaData>,
1296}
1297
1298impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1299    /// Return the next SerializedPageReader
1300    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1301        let rg = self.metadata.row_group(rg_idx);
1302        let column_chunk_metadata = rg.column(self.column_idx);
1303        let offset_index = self.metadata.offset_index();
1304        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1305        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1306        let page_locations = offset_index
1307            .filter(|i| !i[rg_idx].is_empty())
1308            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1309        let total_rows = rg.num_rows() as usize;
1310        let reader = self.reader.clone();
1311
1312        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1313            .add_crypto_context(
1314                rg_idx,
1315                self.column_idx,
1316                self.metadata.as_ref(),
1317                column_chunk_metadata,
1318            )
1319    }
1320}
1321
1322impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1323    type Item = Result<Box<dyn PageReader>>;
1324
1325    fn next(&mut self) -> Option<Self::Item> {
1326        let rg_idx = self.row_groups.next()?;
1327        let page_reader = self
1328            .next_page_reader(rg_idx)
1329            .map(|page_reader| Box::new(page_reader) as _);
1330        Some(page_reader)
1331    }
1332}
1333
1334impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1335
1336/// Reads Parquet data as Arrow [`RecordBatch`]es
1337///
1338/// This struct implements the [`RecordBatchReader`] trait and is an
1339/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1340///
1341/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1342///
1343/// Created by [`ParquetRecordBatchReaderBuilder`]
1344///
1345/// [`Bytes`]: bytes::Bytes
1346pub struct ParquetRecordBatchReader {
1347    array_reader: Box<dyn ArrayReader>,
1348    schema: SchemaRef,
1349    read_plan: ReadPlan,
1350}
1351
1352impl Debug for ParquetRecordBatchReader {
1353    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1354        f.debug_struct("ParquetRecordBatchReader")
1355            .field("array_reader", &"...")
1356            .field("schema", &self.schema)
1357            .field("read_plan", &self.read_plan)
1358            .finish()
1359    }
1360}
1361
1362impl Iterator for ParquetRecordBatchReader {
1363    type Item = Result<RecordBatch, ArrowError>;
1364
1365    fn next(&mut self) -> Option<Self::Item> {
1366        self.next_inner()
1367            .map_err(|arrow_err| arrow_err.into())
1368            .transpose()
1369    }
1370}
1371
1372impl ParquetRecordBatchReader {
1373    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1374    /// has reached the end of the file.
1375    ///
1376    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1377    /// simplify error handling with `?`
1378    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1379        let mut read_records = 0;
1380        let batch_size = self.batch_size();
1381        if batch_size == 0 {
1382            return Ok(None);
1383        }
1384        match self.read_plan.row_selection_cursor_mut() {
1385            RowSelectionCursor::Mask(mask_cursor) => {
1386                // Stream the record batch reader using contiguous segments of the selection
1387                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1388                while !mask_cursor.is_empty() {
1389                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1390                        return Ok(None);
1391                    };
1392
1393                    if mask_chunk.initial_skip > 0 {
1394                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1395                        if skipped != mask_chunk.initial_skip {
1396                            return Err(general_err!(
1397                                "failed to skip rows, expected {}, got {}",
1398                                mask_chunk.initial_skip,
1399                                skipped
1400                            ));
1401                        }
1402                    }
1403
1404                    if mask_chunk.chunk_rows == 0 {
1405                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1406                            return Ok(None);
1407                        }
1408                        continue;
1409                    }
1410
1411                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1412
1413                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1414                    if read == 0 {
1415                        return Err(general_err!(
1416                            "reached end of column while expecting {} rows",
1417                            mask_chunk.chunk_rows
1418                        ));
1419                    }
1420                    if read != mask_chunk.chunk_rows {
1421                        return Err(general_err!(
1422                            "insufficient rows read from array reader - expected {}, got {}",
1423                            mask_chunk.chunk_rows,
1424                            read
1425                        ));
1426                    }
1427
1428                    let array = self.array_reader.consume_batch()?;
1429                    // The column reader exposes the projection as a struct array; convert this
1430                    // into a record batch before applying the boolean filter mask.
1431                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1432                        ArrowError::ParquetError(
1433                            "Struct array reader should return struct array".to_string(),
1434                        )
1435                    })?;
1436
1437                    let filtered_batch =
1438                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1439
1440                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1441                        return Err(general_err!(
1442                            "filtered rows mismatch selection - expected {}, got {}",
1443                            mask_chunk.selected_rows,
1444                            filtered_batch.num_rows()
1445                        ));
1446                    }
1447
1448                    if filtered_batch.num_rows() == 0 {
1449                        continue;
1450                    }
1451
1452                    return Ok(Some(filtered_batch));
1453                }
1454            }
1455            RowSelectionCursor::Selectors(selectors_cursor) => {
1456                while read_records < batch_size && !selectors_cursor.is_empty() {
1457                    let front = selectors_cursor.next_selector();
1458                    if front.skip {
1459                        let skipped = self.array_reader.skip_records(front.row_count)?;
1460
1461                        if skipped != front.row_count {
1462                            return Err(general_err!(
1463                                "failed to skip rows, expected {}, got {}",
1464                                front.row_count,
1465                                skipped
1466                            ));
1467                        }
1468                        continue;
1469                    }
1470
1471                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1472                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1473                    if front.row_count == 0 {
1474                        continue;
1475                    }
1476
1477                    // try to read record
1478                    let need_read = batch_size - read_records;
1479                    let to_read = match front.row_count.checked_sub(need_read) {
1480                        Some(remaining) if remaining != 0 => {
1481                            // if page row count less than batch_size we must set batch size to page row count.
1482                            // add check avoid dead loop
1483                            selectors_cursor.return_selector(RowSelector::select(remaining));
1484                            need_read
1485                        }
1486                        _ => front.row_count,
1487                    };
1488                    match self.array_reader.read_records(to_read)? {
1489                        0 => break,
1490                        rec => read_records += rec,
1491                    };
1492                }
1493            }
1494            RowSelectionCursor::All => {
1495                self.array_reader.read_records(batch_size)?;
1496            }
1497        };
1498
1499        let array = self.array_reader.consume_batch()?;
1500        let struct_array = array.as_struct_opt().ok_or_else(|| {
1501            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1502        })?;
1503
1504        Ok(if struct_array.len() > 0 {
1505            Some(RecordBatch::from(struct_array))
1506        } else {
1507            None
1508        })
1509    }
1510}
1511
1512impl RecordBatchReader for ParquetRecordBatchReader {
1513    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1514    ///
1515    /// Note that the schema metadata will be stripped here. See
1516    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1517    fn schema(&self) -> SchemaRef {
1518        self.schema.clone()
1519    }
1520}
1521
1522impl ParquetRecordBatchReader {
1523    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1524    ///
1525    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1526    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1527        ParquetRecordBatchReaderBuilder::try_new(reader)?
1528            .with_batch_size(batch_size)
1529            .build()
1530    }
1531
1532    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1533    ///
1534    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1535    /// higher-level interface for reading parquet data from a file
1536    pub fn try_new_with_row_groups(
1537        levels: &FieldLevels,
1538        row_groups: &dyn RowGroups,
1539        batch_size: usize,
1540        selection: Option<RowSelection>,
1541    ) -> Result<Self> {
1542        // note metrics are not supported in this API
1543        let metrics = ArrowReaderMetrics::disabled();
1544        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1545            .with_batch_size(batch_size)
1546            .with_parquet_metadata(row_groups.metadata())
1547            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1548
1549        let read_plan = ReadPlanBuilder::new(batch_size)
1550            .with_selection(selection)
1551            .build();
1552
1553        Ok(Self {
1554            array_reader,
1555            schema: Arc::new(Schema::new(levels.fields.clone())),
1556            read_plan,
1557        })
1558    }
1559
1560    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1561    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1562    /// all rows will be returned
1563    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1564        let schema = match array_reader.get_data_type() {
1565            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1566            _ => unreachable!("Struct array reader's data type is not struct!"),
1567        };
1568
1569        Self {
1570            array_reader,
1571            schema: Arc::new(schema),
1572            read_plan,
1573        }
1574    }
1575
1576    #[inline(always)]
1577    pub(crate) fn batch_size(&self) -> usize {
1578        self.read_plan.batch_size()
1579    }
1580}
1581
1582#[cfg(test)]
1583pub(crate) mod tests {
1584    use std::cmp::min;
1585    use std::collections::{HashMap, VecDeque};
1586    use std::fmt::Formatter;
1587    use std::fs::File;
1588    use std::io::Seek;
1589    use std::path::PathBuf;
1590    use std::sync::Arc;
1591
1592    use rand::rngs::StdRng;
1593    use rand::{Rng, RngCore, SeedableRng, random, rng};
1594    use tempfile::tempfile;
1595
1596    use crate::arrow::arrow_reader::{
1597        ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1598        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1599    };
1600    use crate::arrow::schema::{
1601        add_encoded_arrow_schema_to_metadata,
1602        virtual_type::{RowGroupIndex, RowNumber},
1603    };
1604    use crate::arrow::{ArrowWriter, ProjectionMask};
1605    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1606    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1607    use crate::data_type::{
1608        BoolType, ByteArray, ByteArrayType, DataType, DoubleType, FixedLenByteArray,
1609        FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type,
1610    };
1611    use crate::errors::Result;
1612    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1613    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1614    use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1615    use crate::schema::parser::parse_message_type;
1616    use crate::schema::types::{Type, TypePtr};
1617    use crate::util::test_common::rand_gen::RandGen;
1618    use arrow_array::builder::*;
1619    use arrow_array::cast::AsArray;
1620    use arrow_array::types::{
1621        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1622        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1623        Time64MicrosecondType,
1624    };
1625    use arrow_array::*;
1626    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1627    use arrow_data::{ArrayData, ArrayDataBuilder};
1628    use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1629    use arrow_select::concat::concat_batches;
1630    use bytes::Bytes;
1631    use half::f16;
1632    use num_traits::PrimInt;
1633
1634    #[test]
1635    fn test_arrow_reader_all_columns() {
1636        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1637
1638        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1639        let original_schema = Arc::clone(builder.schema());
1640        let reader = builder.build().unwrap();
1641
1642        // Verify that the schema was correctly parsed
1643        assert_eq!(original_schema.fields(), reader.schema().fields());
1644    }
1645
1646    #[test]
1647    fn test_reuse_schema() {
1648        let file = get_test_file("parquet/alltypes-java.parquet");
1649
1650        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1651        let expected = builder.metadata;
1652        let schema = expected.file_metadata().schema_descr_ptr();
1653
1654        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1655        let builder =
1656            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1657
1658        // Verify that the metadata matches
1659        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1660    }
1661
1662    #[test]
1663    fn test_page_encoding_stats_mask() {
1664        let testdata = arrow::util::test_util::parquet_test_data();
1665        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1666        let file = File::open(path).unwrap();
1667
1668        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1669        let builder =
1670            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1671
1672        let row_group_metadata = builder.metadata.row_group(0);
1673
1674        // test page encoding stats
1675        let page_encoding_stats = row_group_metadata
1676            .column(0)
1677            .page_encoding_stats_mask()
1678            .unwrap();
1679        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1680        let page_encoding_stats = row_group_metadata
1681            .column(2)
1682            .page_encoding_stats_mask()
1683            .unwrap();
1684        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1685    }
1686
1687    #[test]
1688    fn test_stats_stats_skipped() {
1689        let testdata = arrow::util::test_util::parquet_test_data();
1690        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1691        let file = File::open(path).unwrap();
1692
1693        // test skipping all
1694        let arrow_options = ArrowReaderOptions::new()
1695            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1696            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1697        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1698            file.try_clone().unwrap(),
1699            arrow_options,
1700        )
1701        .unwrap();
1702
1703        let row_group_metadata = builder.metadata.row_group(0);
1704        for column in row_group_metadata.columns() {
1705            assert!(column.page_encoding_stats().is_none());
1706            assert!(column.page_encoding_stats_mask().is_none());
1707            assert!(column.statistics().is_none());
1708        }
1709
1710        // test skipping all but one column and converting to mask
1711        let arrow_options = ArrowReaderOptions::new()
1712            .with_encoding_stats_as_mask(true)
1713            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1714            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1715        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1716            file.try_clone().unwrap(),
1717            arrow_options,
1718        )
1719        .unwrap();
1720
1721        let row_group_metadata = builder.metadata.row_group(0);
1722        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1723            assert!(column.page_encoding_stats().is_none());
1724            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1725            assert_eq!(column.statistics().is_some(), idx == 0);
1726        }
1727    }
1728
1729    #[test]
1730    fn test_size_stats_stats_skipped() {
1731        let testdata = arrow::util::test_util::parquet_test_data();
1732        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1733        let file = File::open(path).unwrap();
1734
1735        // test skipping all
1736        let arrow_options =
1737            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1738        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1739            file.try_clone().unwrap(),
1740            arrow_options,
1741        )
1742        .unwrap();
1743
1744        let row_group_metadata = builder.metadata.row_group(0);
1745        for column in row_group_metadata.columns() {
1746            assert!(column.repetition_level_histogram().is_none());
1747            assert!(column.definition_level_histogram().is_none());
1748            assert!(column.unencoded_byte_array_data_bytes().is_none());
1749        }
1750
1751        // test skipping all but one column and converting to mask
1752        let arrow_options = ArrowReaderOptions::new()
1753            .with_encoding_stats_as_mask(true)
1754            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1755        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1756            file.try_clone().unwrap(),
1757            arrow_options,
1758        )
1759        .unwrap();
1760
1761        let row_group_metadata = builder.metadata.row_group(0);
1762        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1763            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1764            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1765            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1766        }
1767    }
1768
1769    #[test]
1770    fn test_arrow_reader_single_column() {
1771        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1772
1773        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1774        let original_schema = Arc::clone(builder.schema());
1775
1776        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1777        let reader = builder.with_projection(mask).build().unwrap();
1778
1779        // Verify that the schema was correctly parsed
1780        assert_eq!(1, reader.schema().fields().len());
1781        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1782    }
1783
1784    #[test]
1785    fn test_arrow_reader_single_column_by_name() {
1786        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1787
1788        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1789        let original_schema = Arc::clone(builder.schema());
1790
1791        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1792        let reader = builder.with_projection(mask).build().unwrap();
1793
1794        // Verify that the schema was correctly parsed
1795        assert_eq!(1, reader.schema().fields().len());
1796        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1797    }
1798
1799    #[test]
1800    fn test_null_column_reader_test() {
1801        let mut file = tempfile::tempfile().unwrap();
1802
1803        let schema = "
1804            message message {
1805                OPTIONAL INT32 int32;
1806            }
1807        ";
1808        let schema = Arc::new(parse_message_type(schema).unwrap());
1809
1810        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1811        generate_single_column_file_with_data::<Int32Type>(
1812            &[vec![], vec![]],
1813            Some(&def_levels),
1814            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1815            schema,
1816            Some(Field::new("int32", ArrowDataType::Null, true)),
1817            &Default::default(),
1818        )
1819        .unwrap();
1820
1821        file.rewind().unwrap();
1822
1823        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1824        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1825
1826        assert_eq!(batches.len(), 4);
1827        for batch in &batches[0..3] {
1828            assert_eq!(batch.num_rows(), 2);
1829            assert_eq!(batch.num_columns(), 1);
1830            assert_eq!(batch.column(0).null_count(), 2);
1831        }
1832
1833        assert_eq!(batches[3].num_rows(), 1);
1834        assert_eq!(batches[3].num_columns(), 1);
1835        assert_eq!(batches[3].column(0).null_count(), 1);
1836    }
1837
1838    #[test]
1839    fn test_primitive_single_column_reader_test() {
1840        run_single_column_reader_tests::<BoolType, _, BoolType>(
1841            2,
1842            ConvertedType::NONE,
1843            None,
1844            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1845            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1846        );
1847        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1848            2,
1849            ConvertedType::NONE,
1850            None,
1851            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1852            &[
1853                Encoding::PLAIN,
1854                Encoding::RLE_DICTIONARY,
1855                Encoding::DELTA_BINARY_PACKED,
1856                Encoding::BYTE_STREAM_SPLIT,
1857            ],
1858        );
1859        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1860            2,
1861            ConvertedType::NONE,
1862            None,
1863            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1864            &[
1865                Encoding::PLAIN,
1866                Encoding::RLE_DICTIONARY,
1867                Encoding::DELTA_BINARY_PACKED,
1868                Encoding::BYTE_STREAM_SPLIT,
1869            ],
1870        );
1871        run_single_column_reader_tests::<FloatType, _, FloatType>(
1872            2,
1873            ConvertedType::NONE,
1874            None,
1875            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1876            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1877        );
1878    }
1879
1880    #[test]
1881    fn test_unsigned_primitive_single_column_reader_test() {
1882        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1883            2,
1884            ConvertedType::UINT_32,
1885            Some(ArrowDataType::UInt32),
1886            |vals| {
1887                Arc::new(UInt32Array::from_iter(
1888                    vals.iter().map(|x| x.map(|x| x as u32)),
1889                ))
1890            },
1891            &[
1892                Encoding::PLAIN,
1893                Encoding::RLE_DICTIONARY,
1894                Encoding::DELTA_BINARY_PACKED,
1895            ],
1896        );
1897        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1898            2,
1899            ConvertedType::UINT_64,
1900            Some(ArrowDataType::UInt64),
1901            |vals| {
1902                Arc::new(UInt64Array::from_iter(
1903                    vals.iter().map(|x| x.map(|x| x as u64)),
1904                ))
1905            },
1906            &[
1907                Encoding::PLAIN,
1908                Encoding::RLE_DICTIONARY,
1909                Encoding::DELTA_BINARY_PACKED,
1910            ],
1911        );
1912    }
1913
1914    #[test]
1915    fn test_unsigned_roundtrip() {
1916        let schema = Arc::new(Schema::new(vec![
1917            Field::new("uint32", ArrowDataType::UInt32, true),
1918            Field::new("uint64", ArrowDataType::UInt64, true),
1919        ]));
1920
1921        let mut buf = Vec::with_capacity(1024);
1922        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1923
1924        let original = RecordBatch::try_new(
1925            schema,
1926            vec![
1927                Arc::new(UInt32Array::from_iter_values([
1928                    0,
1929                    i32::MAX as u32,
1930                    u32::MAX,
1931                ])),
1932                Arc::new(UInt64Array::from_iter_values([
1933                    0,
1934                    i64::MAX as u64,
1935                    u64::MAX,
1936                ])),
1937            ],
1938        )
1939        .unwrap();
1940
1941        writer.write(&original).unwrap();
1942        writer.close().unwrap();
1943
1944        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1945        let ret = reader.next().unwrap().unwrap();
1946        assert_eq!(ret, original);
1947
1948        // Check they can be downcast to the correct type
1949        ret.column(0)
1950            .as_any()
1951            .downcast_ref::<UInt32Array>()
1952            .unwrap();
1953
1954        ret.column(1)
1955            .as_any()
1956            .downcast_ref::<UInt64Array>()
1957            .unwrap();
1958    }
1959
1960    #[test]
1961    fn test_float16_roundtrip() -> Result<()> {
1962        let schema = Arc::new(Schema::new(vec![
1963            Field::new("float16", ArrowDataType::Float16, false),
1964            Field::new("float16-nullable", ArrowDataType::Float16, true),
1965        ]));
1966
1967        let mut buf = Vec::with_capacity(1024);
1968        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1969
1970        let original = RecordBatch::try_new(
1971            schema,
1972            vec![
1973                Arc::new(Float16Array::from_iter_values([
1974                    f16::EPSILON,
1975                    f16::MIN,
1976                    f16::MAX,
1977                    f16::NAN,
1978                    f16::INFINITY,
1979                    f16::NEG_INFINITY,
1980                    f16::ONE,
1981                    f16::NEG_ONE,
1982                    f16::ZERO,
1983                    f16::NEG_ZERO,
1984                    f16::E,
1985                    f16::PI,
1986                    f16::FRAC_1_PI,
1987                ])),
1988                Arc::new(Float16Array::from(vec![
1989                    None,
1990                    None,
1991                    None,
1992                    Some(f16::NAN),
1993                    Some(f16::INFINITY),
1994                    Some(f16::NEG_INFINITY),
1995                    None,
1996                    None,
1997                    None,
1998                    None,
1999                    None,
2000                    None,
2001                    Some(f16::FRAC_1_PI),
2002                ])),
2003            ],
2004        )?;
2005
2006        writer.write(&original)?;
2007        writer.close()?;
2008
2009        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2010        let ret = reader.next().unwrap()?;
2011        assert_eq!(ret, original);
2012
2013        // Ensure can be downcast to the correct type
2014        ret.column(0).as_primitive::<Float16Type>();
2015        ret.column(1).as_primitive::<Float16Type>();
2016
2017        Ok(())
2018    }
2019
2020    #[test]
2021    fn test_time_utc_roundtrip() -> Result<()> {
2022        let schema = Arc::new(Schema::new(vec![
2023            Field::new(
2024                "time_millis",
2025                ArrowDataType::Time32(TimeUnit::Millisecond),
2026                true,
2027            )
2028            .with_metadata(HashMap::from_iter(vec![(
2029                "adjusted_to_utc".to_string(),
2030                "".to_string(),
2031            )])),
2032            Field::new(
2033                "time_micros",
2034                ArrowDataType::Time64(TimeUnit::Microsecond),
2035                true,
2036            )
2037            .with_metadata(HashMap::from_iter(vec![(
2038                "adjusted_to_utc".to_string(),
2039                "".to_string(),
2040            )])),
2041        ]));
2042
2043        let mut buf = Vec::with_capacity(1024);
2044        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2045
2046        let original = RecordBatch::try_new(
2047            schema,
2048            vec![
2049                Arc::new(Time32MillisecondArray::from(vec![
2050                    Some(-1),
2051                    Some(0),
2052                    Some(86_399_000),
2053                    Some(86_400_000),
2054                    Some(86_401_000),
2055                    None,
2056                ])),
2057                Arc::new(Time64MicrosecondArray::from(vec![
2058                    Some(-1),
2059                    Some(0),
2060                    Some(86_399 * 1_000_000),
2061                    Some(86_400 * 1_000_000),
2062                    Some(86_401 * 1_000_000),
2063                    None,
2064                ])),
2065            ],
2066        )?;
2067
2068        writer.write(&original)?;
2069        writer.close()?;
2070
2071        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2072        let ret = reader.next().unwrap()?;
2073        assert_eq!(ret, original);
2074
2075        // Ensure can be downcast to the correct type
2076        ret.column(0).as_primitive::<Time32MillisecondType>();
2077        ret.column(1).as_primitive::<Time64MicrosecondType>();
2078
2079        Ok(())
2080    }
2081
2082    #[test]
2083    fn test_date32_roundtrip() -> Result<()> {
2084        use arrow_array::Date32Array;
2085
2086        let schema = Arc::new(Schema::new(vec![Field::new(
2087            "date32",
2088            ArrowDataType::Date32,
2089            false,
2090        )]));
2091
2092        let mut buf = Vec::with_capacity(1024);
2093
2094        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2095
2096        let original = RecordBatch::try_new(
2097            schema,
2098            vec![Arc::new(Date32Array::from(vec![
2099                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2100            ]))],
2101        )?;
2102
2103        writer.write(&original)?;
2104        writer.close()?;
2105
2106        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2107        let ret = reader.next().unwrap()?;
2108        assert_eq!(ret, original);
2109
2110        // Ensure can be downcast to the correct type
2111        ret.column(0).as_primitive::<Date32Type>();
2112
2113        Ok(())
2114    }
2115
2116    #[test]
2117    fn test_date64_roundtrip() -> Result<()> {
2118        use arrow_array::Date64Array;
2119
2120        let schema = Arc::new(Schema::new(vec![
2121            Field::new("small-date64", ArrowDataType::Date64, false),
2122            Field::new("big-date64", ArrowDataType::Date64, false),
2123            Field::new("invalid-date64", ArrowDataType::Date64, false),
2124        ]));
2125
2126        let mut default_buf = Vec::with_capacity(1024);
2127        let mut coerce_buf = Vec::with_capacity(1024);
2128
2129        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2130
2131        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2132        let mut coerce_writer =
2133            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2134
2135        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2136
2137        let original = RecordBatch::try_new(
2138            schema,
2139            vec![
2140                // small-date64
2141                Arc::new(Date64Array::from(vec![
2142                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2143                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2144                    0,
2145                    1_000 * NUM_MILLISECONDS_IN_DAY,
2146                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2147                ])),
2148                // big-date64
2149                Arc::new(Date64Array::from(vec![
2150                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2151                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2152                    0,
2153                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2154                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2155                ])),
2156                // invalid-date64
2157                Arc::new(Date64Array::from(vec![
2158                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2159                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2160                    1,
2161                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2162                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2163                ])),
2164            ],
2165        )?;
2166
2167        default_writer.write(&original)?;
2168        coerce_writer.write(&original)?;
2169
2170        default_writer.close()?;
2171        coerce_writer.close()?;
2172
2173        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2174        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2175
2176        let default_ret = default_reader.next().unwrap()?;
2177        let coerce_ret = coerce_reader.next().unwrap()?;
2178
2179        // Roundtrip should be successful when default writer used
2180        assert_eq!(default_ret, original);
2181
2182        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2183        assert_eq!(coerce_ret.column(0), original.column(0));
2184        assert_ne!(coerce_ret.column(1), original.column(1));
2185        assert_ne!(coerce_ret.column(2), original.column(2));
2186
2187        // Ensure both can be downcast to the correct type
2188        default_ret.column(0).as_primitive::<Date64Type>();
2189        coerce_ret.column(0).as_primitive::<Date64Type>();
2190
2191        Ok(())
2192    }
2193    struct RandFixedLenGen {}
2194
2195    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2196        fn r#gen(len: i32) -> FixedLenByteArray {
2197            let mut v = vec![0u8; len as usize];
2198            rng().fill_bytes(&mut v);
2199            ByteArray::from(v).into()
2200        }
2201    }
2202
2203    #[test]
2204    fn test_fixed_length_binary_column_reader() {
2205        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2206            20,
2207            ConvertedType::NONE,
2208            None,
2209            |vals| {
2210                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2211                for val in vals {
2212                    match val {
2213                        Some(b) => builder.append_value(b).unwrap(),
2214                        None => builder.append_null(),
2215                    }
2216                }
2217                Arc::new(builder.finish())
2218            },
2219            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2220        );
2221    }
2222
2223    #[test]
2224    fn test_interval_day_time_column_reader() {
2225        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2226            12,
2227            ConvertedType::INTERVAL,
2228            None,
2229            |vals| {
2230                Arc::new(
2231                    vals.iter()
2232                        .map(|x| {
2233                            x.as_ref().map(|b| IntervalDayTime {
2234                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2235                                milliseconds: i32::from_le_bytes(
2236                                    b.as_ref()[8..12].try_into().unwrap(),
2237                                ),
2238                            })
2239                        })
2240                        .collect::<IntervalDayTimeArray>(),
2241                )
2242            },
2243            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2244        );
2245    }
2246
2247    #[test]
2248    fn test_int96_single_column_reader_test() {
2249        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2250
2251        type TypeHintAndConversionFunction =
2252            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2253
2254        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2255            // Test without a specified ArrowType hint.
2256            (None, |vals: &[Option<Int96>]| {
2257                Arc::new(TimestampNanosecondArray::from_iter(
2258                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2259                )) as ArrayRef
2260            }),
2261            // Test other TimeUnits as ArrowType hints.
2262            (
2263                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2264                |vals: &[Option<Int96>]| {
2265                    Arc::new(TimestampSecondArray::from_iter(
2266                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2267                    )) as ArrayRef
2268                },
2269            ),
2270            (
2271                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2272                |vals: &[Option<Int96>]| {
2273                    Arc::new(TimestampMillisecondArray::from_iter(
2274                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2275                    )) as ArrayRef
2276                },
2277            ),
2278            (
2279                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2280                |vals: &[Option<Int96>]| {
2281                    Arc::new(TimestampMicrosecondArray::from_iter(
2282                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2283                    )) as ArrayRef
2284                },
2285            ),
2286            (
2287                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2288                |vals: &[Option<Int96>]| {
2289                    Arc::new(TimestampNanosecondArray::from_iter(
2290                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2291                    )) as ArrayRef
2292                },
2293            ),
2294            // Test another timezone with TimeUnit as ArrowType hints.
2295            (
2296                Some(ArrowDataType::Timestamp(
2297                    TimeUnit::Second,
2298                    Some(Arc::from("-05:00")),
2299                )),
2300                |vals: &[Option<Int96>]| {
2301                    Arc::new(
2302                        TimestampSecondArray::from_iter(
2303                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2304                        )
2305                        .with_timezone("-05:00"),
2306                    ) as ArrayRef
2307                },
2308            ),
2309        ];
2310
2311        resolutions.iter().for_each(|(arrow_type, converter)| {
2312            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2313                2,
2314                ConvertedType::NONE,
2315                arrow_type.clone(),
2316                converter,
2317                encodings,
2318            );
2319        })
2320    }
2321
2322    #[test]
2323    fn test_int96_from_spark_file_with_provided_schema() {
2324        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2325        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2326        // microsecond resolution for the Parquet reader to interpret these values correctly.
2327        use arrow_schema::DataType::Timestamp;
2328        let test_data = arrow::util::test_util::parquet_test_data();
2329        let path = format!("{test_data}/int96_from_spark.parquet");
2330        let file = File::open(path).unwrap();
2331
2332        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2333            "a",
2334            Timestamp(TimeUnit::Microsecond, None),
2335            true,
2336        )]));
2337        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2338
2339        let mut record_reader =
2340            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2341                .unwrap()
2342                .build()
2343                .unwrap();
2344
2345        let batch = record_reader.next().unwrap().unwrap();
2346        assert_eq!(batch.num_columns(), 1);
2347        let column = batch.column(0);
2348        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2349
2350        let expected = Arc::new(Int64Array::from(vec![
2351            Some(1704141296123456),
2352            Some(1704070800000000),
2353            Some(253402225200000000),
2354            Some(1735599600000000),
2355            None,
2356            Some(9089380393200000000),
2357        ]));
2358
2359        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2360        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2361        // anyway, so this should be a zero-copy non-modifying cast.
2362
2363        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2364        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2365
2366        assert_eq!(casted_timestamps.len(), expected.len());
2367
2368        casted_timestamps
2369            .iter()
2370            .zip(expected.iter())
2371            .for_each(|(lhs, rhs)| {
2372                assert_eq!(lhs, rhs);
2373            });
2374    }
2375
2376    #[test]
2377    fn test_int96_from_spark_file_without_provided_schema() {
2378        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2379        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2380        // values when read as nanosecond resolution overflow and result in garbage values.
2381        use arrow_schema::DataType::Timestamp;
2382        let test_data = arrow::util::test_util::parquet_test_data();
2383        let path = format!("{test_data}/int96_from_spark.parquet");
2384        let file = File::open(path).unwrap();
2385
2386        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2387            .unwrap()
2388            .build()
2389            .unwrap();
2390
2391        let batch = record_reader.next().unwrap().unwrap();
2392        assert_eq!(batch.num_columns(), 1);
2393        let column = batch.column(0);
2394        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2395
2396        let expected = Arc::new(Int64Array::from(vec![
2397            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2398            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2399            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2400            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2401            None,
2402            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2403        ]));
2404
2405        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2406        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2407        // anyway, so this should be a zero-copy non-modifying cast.
2408
2409        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2410        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2411
2412        assert_eq!(casted_timestamps.len(), expected.len());
2413
2414        casted_timestamps
2415            .iter()
2416            .zip(expected.iter())
2417            .for_each(|(lhs, rhs)| {
2418                assert_eq!(lhs, rhs);
2419            });
2420    }
2421
2422    struct RandUtf8Gen {}
2423
2424    impl RandGen<ByteArrayType> for RandUtf8Gen {
2425        fn r#gen(len: i32) -> ByteArray {
2426            Int32Type::r#gen(len).to_string().as_str().into()
2427        }
2428    }
2429
2430    #[test]
2431    fn test_utf8_single_column_reader_test() {
2432        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2433            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2434                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2435            })))
2436        }
2437
2438        let encodings = &[
2439            Encoding::PLAIN,
2440            Encoding::RLE_DICTIONARY,
2441            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2442            Encoding::DELTA_BYTE_ARRAY,
2443        ];
2444
2445        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2446            2,
2447            ConvertedType::NONE,
2448            None,
2449            |vals| {
2450                Arc::new(BinaryArray::from_iter(
2451                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2452                ))
2453            },
2454            encodings,
2455        );
2456
2457        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2458            2,
2459            ConvertedType::UTF8,
2460            None,
2461            string_converter::<i32>,
2462            encodings,
2463        );
2464
2465        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2466            2,
2467            ConvertedType::UTF8,
2468            Some(ArrowDataType::Utf8),
2469            string_converter::<i32>,
2470            encodings,
2471        );
2472
2473        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2474            2,
2475            ConvertedType::UTF8,
2476            Some(ArrowDataType::LargeUtf8),
2477            string_converter::<i64>,
2478            encodings,
2479        );
2480
2481        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2482        for key in &small_key_types {
2483            for encoding in encodings {
2484                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2485                opts.encoding = *encoding;
2486
2487                let data_type =
2488                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2489
2490                // Cannot run full test suite as keys overflow, run small test instead
2491                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2492                    opts,
2493                    2,
2494                    ConvertedType::UTF8,
2495                    Some(data_type.clone()),
2496                    move |vals| {
2497                        let vals = string_converter::<i32>(vals);
2498                        arrow::compute::cast(&vals, &data_type).unwrap()
2499                    },
2500                );
2501            }
2502        }
2503
2504        let key_types = [
2505            ArrowDataType::Int16,
2506            ArrowDataType::UInt16,
2507            ArrowDataType::Int32,
2508            ArrowDataType::UInt32,
2509            ArrowDataType::Int64,
2510            ArrowDataType::UInt64,
2511        ];
2512
2513        for key in &key_types {
2514            let data_type =
2515                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2516
2517            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2518                2,
2519                ConvertedType::UTF8,
2520                Some(data_type.clone()),
2521                move |vals| {
2522                    let vals = string_converter::<i32>(vals);
2523                    arrow::compute::cast(&vals, &data_type).unwrap()
2524                },
2525                encodings,
2526            );
2527
2528            let data_type = ArrowDataType::Dictionary(
2529                Box::new(key.clone()),
2530                Box::new(ArrowDataType::LargeUtf8),
2531            );
2532
2533            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2534                2,
2535                ConvertedType::UTF8,
2536                Some(data_type.clone()),
2537                move |vals| {
2538                    let vals = string_converter::<i64>(vals);
2539                    arrow::compute::cast(&vals, &data_type).unwrap()
2540                },
2541                encodings,
2542            );
2543        }
2544    }
2545
2546    #[test]
2547    fn test_decimal_nullable_struct() {
2548        let decimals = Decimal256Array::from_iter_values(
2549            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2550        );
2551
2552        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2553            "decimals",
2554            decimals.data_type().clone(),
2555            false,
2556        )])))
2557        .len(8)
2558        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2559        .child_data(vec![decimals.into_data()])
2560        .build()
2561        .unwrap();
2562
2563        let written =
2564            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2565                .unwrap();
2566
2567        let mut buffer = Vec::with_capacity(1024);
2568        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2569        writer.write(&written).unwrap();
2570        writer.close().unwrap();
2571
2572        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2573            .unwrap()
2574            .collect::<Result<Vec<_>, _>>()
2575            .unwrap();
2576
2577        assert_eq!(&written.slice(0, 3), &read[0]);
2578        assert_eq!(&written.slice(3, 3), &read[1]);
2579        assert_eq!(&written.slice(6, 2), &read[2]);
2580    }
2581
2582    #[test]
2583    fn test_int32_nullable_struct() {
2584        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2585        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2586            "int32",
2587            int32.data_type().clone(),
2588            false,
2589        )])))
2590        .len(8)
2591        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2592        .child_data(vec![int32.into_data()])
2593        .build()
2594        .unwrap();
2595
2596        let written =
2597            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2598                .unwrap();
2599
2600        let mut buffer = Vec::with_capacity(1024);
2601        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2602        writer.write(&written).unwrap();
2603        writer.close().unwrap();
2604
2605        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2606            .unwrap()
2607            .collect::<Result<Vec<_>, _>>()
2608            .unwrap();
2609
2610        assert_eq!(&written.slice(0, 3), &read[0]);
2611        assert_eq!(&written.slice(3, 3), &read[1]);
2612        assert_eq!(&written.slice(6, 2), &read[2]);
2613    }
2614
2615    #[test]
2616    fn test_decimal_list() {
2617        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2618
2619        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2620        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2621            decimals.data_type().clone(),
2622            false,
2623        ))))
2624        .len(7)
2625        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2626        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2627        .child_data(vec![decimals.into_data()])
2628        .build()
2629        .unwrap();
2630
2631        let written =
2632            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2633                .unwrap();
2634
2635        let mut buffer = Vec::with_capacity(1024);
2636        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2637        writer.write(&written).unwrap();
2638        writer.close().unwrap();
2639
2640        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2641            .unwrap()
2642            .collect::<Result<Vec<_>, _>>()
2643            .unwrap();
2644
2645        assert_eq!(&written.slice(0, 3), &read[0]);
2646        assert_eq!(&written.slice(3, 3), &read[1]);
2647        assert_eq!(&written.slice(6, 1), &read[2]);
2648    }
2649
2650    #[test]
2651    fn test_read_decimal_file() {
2652        use arrow_array::Decimal128Array;
2653        let testdata = arrow::util::test_util::parquet_test_data();
2654        let file_variants = vec![
2655            ("byte_array", 4),
2656            ("fixed_length", 25),
2657            ("int32", 4),
2658            ("int64", 10),
2659        ];
2660        for (prefix, target_precision) in file_variants {
2661            let path = format!("{testdata}/{prefix}_decimal.parquet");
2662            let file = File::open(path).unwrap();
2663            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2664
2665            let batch = record_reader.next().unwrap().unwrap();
2666            assert_eq!(batch.num_rows(), 24);
2667            let col = batch
2668                .column(0)
2669                .as_any()
2670                .downcast_ref::<Decimal128Array>()
2671                .unwrap();
2672
2673            let expected = 1..25;
2674
2675            assert_eq!(col.precision(), target_precision);
2676            assert_eq!(col.scale(), 2);
2677
2678            for (i, v) in expected.enumerate() {
2679                assert_eq!(col.value(i), v * 100_i128);
2680            }
2681        }
2682    }
2683
2684    #[test]
2685    fn test_read_float16_nonzeros_file() {
2686        use arrow_array::Float16Array;
2687        let testdata = arrow::util::test_util::parquet_test_data();
2688        // see https://github.com/apache/parquet-testing/pull/40
2689        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2690        let file = File::open(path).unwrap();
2691        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2692
2693        let batch = record_reader.next().unwrap().unwrap();
2694        assert_eq!(batch.num_rows(), 8);
2695        let col = batch
2696            .column(0)
2697            .as_any()
2698            .downcast_ref::<Float16Array>()
2699            .unwrap();
2700
2701        let f16_two = f16::ONE + f16::ONE;
2702
2703        assert_eq!(col.null_count(), 1);
2704        assert!(col.is_null(0));
2705        assert_eq!(col.value(1), f16::ONE);
2706        assert_eq!(col.value(2), -f16_two);
2707        assert!(col.value(3).is_nan());
2708        assert_eq!(col.value(4), f16::ZERO);
2709        assert!(col.value(4).is_sign_positive());
2710        assert_eq!(col.value(5), f16::NEG_ONE);
2711        assert_eq!(col.value(6), f16::NEG_ZERO);
2712        assert!(col.value(6).is_sign_negative());
2713        assert_eq!(col.value(7), f16_two);
2714    }
2715
2716    #[test]
2717    fn test_read_float16_zeros_file() {
2718        use arrow_array::Float16Array;
2719        let testdata = arrow::util::test_util::parquet_test_data();
2720        // see https://github.com/apache/parquet-testing/pull/40
2721        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2722        let file = File::open(path).unwrap();
2723        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2724
2725        let batch = record_reader.next().unwrap().unwrap();
2726        assert_eq!(batch.num_rows(), 3);
2727        let col = batch
2728            .column(0)
2729            .as_any()
2730            .downcast_ref::<Float16Array>()
2731            .unwrap();
2732
2733        assert_eq!(col.null_count(), 1);
2734        assert!(col.is_null(0));
2735        assert_eq!(col.value(1), f16::ZERO);
2736        assert!(col.value(1).is_sign_positive());
2737        assert!(col.value(2).is_nan());
2738    }
2739
2740    #[test]
2741    fn test_read_float32_float64_byte_stream_split() {
2742        let path = format!(
2743            "{}/byte_stream_split.zstd.parquet",
2744            arrow::util::test_util::parquet_test_data(),
2745        );
2746        let file = File::open(path).unwrap();
2747        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2748
2749        let mut row_count = 0;
2750        for batch in record_reader {
2751            let batch = batch.unwrap();
2752            row_count += batch.num_rows();
2753            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2754            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2755
2756            // This file contains floats from a standard normal distribution
2757            for &x in f32_col.values() {
2758                assert!(x > -10.0);
2759                assert!(x < 10.0);
2760            }
2761            for &x in f64_col.values() {
2762                assert!(x > -10.0);
2763                assert!(x < 10.0);
2764            }
2765        }
2766        assert_eq!(row_count, 300);
2767    }
2768
2769    #[test]
2770    fn test_read_extended_byte_stream_split() {
2771        let path = format!(
2772            "{}/byte_stream_split_extended.gzip.parquet",
2773            arrow::util::test_util::parquet_test_data(),
2774        );
2775        let file = File::open(path).unwrap();
2776        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2777
2778        let mut row_count = 0;
2779        for batch in record_reader {
2780            let batch = batch.unwrap();
2781            row_count += batch.num_rows();
2782
2783            // 0,1 are f16
2784            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2785            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2786            assert_eq!(f16_col.len(), f16_bss.len());
2787            f16_col
2788                .iter()
2789                .zip(f16_bss.iter())
2790                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2791
2792            // 2,3 are f32
2793            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2794            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2795            assert_eq!(f32_col.len(), f32_bss.len());
2796            f32_col
2797                .iter()
2798                .zip(f32_bss.iter())
2799                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2800
2801            // 4,5 are f64
2802            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2803            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2804            assert_eq!(f64_col.len(), f64_bss.len());
2805            f64_col
2806                .iter()
2807                .zip(f64_bss.iter())
2808                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2809
2810            // 6,7 are i32
2811            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2812            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2813            assert_eq!(i32_col.len(), i32_bss.len());
2814            i32_col
2815                .iter()
2816                .zip(i32_bss.iter())
2817                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2818
2819            // 8,9 are i64
2820            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2821            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2822            assert_eq!(i64_col.len(), i64_bss.len());
2823            i64_col
2824                .iter()
2825                .zip(i64_bss.iter())
2826                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2827
2828            // 10,11 are FLBA(5)
2829            let flba_col = batch.column(10).as_fixed_size_binary();
2830            let flba_bss = batch.column(11).as_fixed_size_binary();
2831            assert_eq!(flba_col.len(), flba_bss.len());
2832            flba_col
2833                .iter()
2834                .zip(flba_bss.iter())
2835                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2836
2837            // 12,13 are FLBA(4) (decimal(7,3))
2838            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2839            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2840            assert_eq!(dec_col.len(), dec_bss.len());
2841            dec_col
2842                .iter()
2843                .zip(dec_bss.iter())
2844                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2845        }
2846        assert_eq!(row_count, 200);
2847    }
2848
2849    #[test]
2850    fn test_read_incorrect_map_schema_file() {
2851        let testdata = arrow::util::test_util::parquet_test_data();
2852        // see https://github.com/apache/parquet-testing/pull/47
2853        let path = format!("{testdata}/incorrect_map_schema.parquet");
2854        let file = File::open(path).unwrap();
2855        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2856
2857        let batch = record_reader.next().unwrap().unwrap();
2858        assert_eq!(batch.num_rows(), 1);
2859
2860        let expected_schema = Schema::new(vec![Field::new(
2861            "my_map",
2862            ArrowDataType::Map(
2863                Arc::new(Field::new(
2864                    "key_value",
2865                    ArrowDataType::Struct(Fields::from(vec![
2866                        Field::new("key", ArrowDataType::Utf8, false),
2867                        Field::new("value", ArrowDataType::Utf8, true),
2868                    ])),
2869                    false,
2870                )),
2871                false,
2872            ),
2873            true,
2874        )]);
2875        assert_eq!(batch.schema().as_ref(), &expected_schema);
2876
2877        assert_eq!(batch.num_rows(), 1);
2878        assert_eq!(batch.column(0).null_count(), 0);
2879        assert_eq!(
2880            batch.column(0).as_map().keys().as_ref(),
2881            &StringArray::from(vec!["parent", "name"])
2882        );
2883        assert_eq!(
2884            batch.column(0).as_map().values().as_ref(),
2885            &StringArray::from(vec!["another", "report"])
2886        );
2887    }
2888
2889    #[test]
2890    fn test_read_dict_fixed_size_binary() {
2891        let schema = Arc::new(Schema::new(vec![Field::new(
2892            "a",
2893            ArrowDataType::Dictionary(
2894                Box::new(ArrowDataType::UInt8),
2895                Box::new(ArrowDataType::FixedSizeBinary(8)),
2896            ),
2897            true,
2898        )]));
2899        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2900        let values = FixedSizeBinaryArray::try_from_iter(
2901            vec![
2902                (0u8..8u8).collect::<Vec<u8>>(),
2903                (24u8..32u8).collect::<Vec<u8>>(),
2904            ]
2905            .into_iter(),
2906        )
2907        .unwrap();
2908        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2909        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2910
2911        let mut buffer = Vec::with_capacity(1024);
2912        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2913        writer.write(&batch).unwrap();
2914        writer.close().unwrap();
2915        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2916            .unwrap()
2917            .collect::<Result<Vec<_>, _>>()
2918            .unwrap();
2919
2920        assert_eq!(read.len(), 1);
2921        assert_eq!(&batch, &read[0])
2922    }
2923
2924    #[test]
2925    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2926        // the `StructArrayReader` will check the definition and repetition levels of the first
2927        // child column in the struct to determine nullability for the struct. If the first
2928        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2929        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2930        // buffers managed by this array reader.
2931
2932        let struct_fields = Fields::from(vec![
2933            Field::new(
2934                "city",
2935                ArrowDataType::Dictionary(
2936                    Box::new(ArrowDataType::UInt8),
2937                    Box::new(ArrowDataType::Utf8),
2938                ),
2939                true,
2940            ),
2941            Field::new("name", ArrowDataType::Utf8, true),
2942        ]);
2943        let schema = Arc::new(Schema::new(vec![Field::new(
2944            "items",
2945            ArrowDataType::Struct(struct_fields.clone()),
2946            true,
2947        )]));
2948
2949        let items_arr = StructArray::new(
2950            struct_fields,
2951            vec![
2952                Arc::new(DictionaryArray::new(
2953                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2954                    Arc::new(StringArray::from_iter_values(vec![
2955                        "quebec",
2956                        "fredericton",
2957                        "halifax",
2958                    ])),
2959                )),
2960                Arc::new(StringArray::from_iter_values(vec![
2961                    "albert", "terry", "lance", "", "tim",
2962                ])),
2963            ],
2964            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2965        );
2966
2967        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2968        let mut buffer = Vec::with_capacity(1024);
2969        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2970        writer.write(&batch).unwrap();
2971        writer.close().unwrap();
2972        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2973            .unwrap()
2974            .collect::<Result<Vec<_>, _>>()
2975            .unwrap();
2976
2977        assert_eq!(read.len(), 1);
2978        assert_eq!(&batch, &read[0])
2979    }
2980
2981    /// Parameters for single_column_reader_test
2982    #[derive(Clone)]
2983    struct TestOptions {
2984        /// Number of row group to write to parquet (row group size =
2985        /// num_row_groups / num_rows)
2986        num_row_groups: usize,
2987        /// Total number of rows per row group
2988        num_rows: usize,
2989        /// Size of batches to read back
2990        record_batch_size: usize,
2991        /// Percentage of nulls in column or None if required
2992        null_percent: Option<usize>,
2993        /// Set write batch size
2994        ///
2995        /// This is the number of rows that are written at once to a page and
2996        /// therefore acts as a bound on the page granularity of a row group
2997        write_batch_size: usize,
2998        /// Maximum size of page in bytes
2999        max_data_page_size: usize,
3000        /// Maximum size of dictionary page in bytes
3001        max_dict_page_size: usize,
3002        /// Writer version
3003        writer_version: WriterVersion,
3004        /// Enabled statistics
3005        enabled_statistics: EnabledStatistics,
3006        /// Encoding
3007        encoding: Encoding,
3008        /// row selections and total selected row count
3009        row_selections: Option<(RowSelection, usize)>,
3010        /// row filter
3011        row_filter: Option<Vec<bool>>,
3012        /// limit
3013        limit: Option<usize>,
3014        /// offset
3015        offset: Option<usize>,
3016    }
3017
3018    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3019    impl std::fmt::Debug for TestOptions {
3020        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3021            f.debug_struct("TestOptions")
3022                .field("num_row_groups", &self.num_row_groups)
3023                .field("num_rows", &self.num_rows)
3024                .field("record_batch_size", &self.record_batch_size)
3025                .field("null_percent", &self.null_percent)
3026                .field("write_batch_size", &self.write_batch_size)
3027                .field("max_data_page_size", &self.max_data_page_size)
3028                .field("max_dict_page_size", &self.max_dict_page_size)
3029                .field("writer_version", &self.writer_version)
3030                .field("enabled_statistics", &self.enabled_statistics)
3031                .field("encoding", &self.encoding)
3032                .field("row_selections", &self.row_selections.is_some())
3033                .field("row_filter", &self.row_filter.is_some())
3034                .field("limit", &self.limit)
3035                .field("offset", &self.offset)
3036                .finish()
3037        }
3038    }
3039
3040    impl Default for TestOptions {
3041        fn default() -> Self {
3042            Self {
3043                num_row_groups: 2,
3044                num_rows: 100,
3045                record_batch_size: 15,
3046                null_percent: None,
3047                write_batch_size: 64,
3048                max_data_page_size: 1024 * 1024,
3049                max_dict_page_size: 1024 * 1024,
3050                writer_version: WriterVersion::PARQUET_1_0,
3051                enabled_statistics: EnabledStatistics::Page,
3052                encoding: Encoding::PLAIN,
3053                row_selections: None,
3054                row_filter: None,
3055                limit: None,
3056                offset: None,
3057            }
3058        }
3059    }
3060
3061    impl TestOptions {
3062        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3063            Self {
3064                num_row_groups,
3065                num_rows,
3066                record_batch_size,
3067                ..Default::default()
3068            }
3069        }
3070
3071        fn with_null_percent(self, null_percent: usize) -> Self {
3072            Self {
3073                null_percent: Some(null_percent),
3074                ..self
3075            }
3076        }
3077
3078        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3079            Self {
3080                max_data_page_size,
3081                ..self
3082            }
3083        }
3084
3085        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3086            Self {
3087                max_dict_page_size,
3088                ..self
3089            }
3090        }
3091
3092        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3093            Self {
3094                enabled_statistics,
3095                ..self
3096            }
3097        }
3098
3099        fn with_row_selections(self) -> Self {
3100            assert!(self.row_filter.is_none(), "Must set row selection first");
3101
3102            let mut rng = rng();
3103            let step = rng.random_range(self.record_batch_size..self.num_rows);
3104            let row_selections = create_test_selection(
3105                step,
3106                self.num_row_groups * self.num_rows,
3107                rng.random::<bool>(),
3108            );
3109            Self {
3110                row_selections: Some(row_selections),
3111                ..self
3112            }
3113        }
3114
3115        fn with_row_filter(self) -> Self {
3116            let row_count = match &self.row_selections {
3117                Some((_, count)) => *count,
3118                None => self.num_row_groups * self.num_rows,
3119            };
3120
3121            let mut rng = rng();
3122            Self {
3123                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3124                ..self
3125            }
3126        }
3127
3128        fn with_limit(self, limit: usize) -> Self {
3129            Self {
3130                limit: Some(limit),
3131                ..self
3132            }
3133        }
3134
3135        fn with_offset(self, offset: usize) -> Self {
3136            Self {
3137                offset: Some(offset),
3138                ..self
3139            }
3140        }
3141
3142        fn writer_props(&self) -> WriterProperties {
3143            let builder = WriterProperties::builder()
3144                .set_data_page_size_limit(self.max_data_page_size)
3145                .set_write_batch_size(self.write_batch_size)
3146                .set_writer_version(self.writer_version)
3147                .set_statistics_enabled(self.enabled_statistics);
3148
3149            let builder = match self.encoding {
3150                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3151                    .set_dictionary_enabled(true)
3152                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3153                _ => builder
3154                    .set_dictionary_enabled(false)
3155                    .set_encoding(self.encoding),
3156            };
3157
3158            builder.build()
3159        }
3160    }
3161
3162    /// Create a parquet file and then read it using
3163    /// `ParquetFileArrowReader` using a standard set of parameters
3164    /// `opts`.
3165    ///
3166    /// `rand_max` represents the maximum size of value to pass to to
3167    /// value generator
3168    fn run_single_column_reader_tests<T, F, G>(
3169        rand_max: i32,
3170        converted_type: ConvertedType,
3171        arrow_type: Option<ArrowDataType>,
3172        converter: F,
3173        encodings: &[Encoding],
3174    ) where
3175        T: DataType,
3176        G: RandGen<T>,
3177        F: Fn(&[Option<T::T>]) -> ArrayRef,
3178    {
3179        let all_options = vec![
3180            // choose record_batch_batch (15) so batches cross row
3181            // group boundaries (50 rows in 2 row groups) cases.
3182            TestOptions::new(2, 100, 15),
3183            // choose record_batch_batch (5) so batches sometime fall
3184            // on row group boundaries and (25 rows in 3 row groups
3185            // --> row groups of 10, 10, and 5). Tests buffer
3186            // refilling edge cases.
3187            TestOptions::new(3, 25, 5),
3188            // Choose record_batch_size (25) so all batches fall
3189            // exactly on row group boundary (25). Tests buffer
3190            // refilling edge cases.
3191            TestOptions::new(4, 100, 25),
3192            // Set maximum page size so row groups have multiple pages
3193            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3194            // Set small dictionary page size to test dictionary fallback
3195            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3196            // Test optional but with no nulls
3197            TestOptions::new(2, 256, 127).with_null_percent(0),
3198            // Test optional with nulls
3199            TestOptions::new(2, 256, 93).with_null_percent(25),
3200            // Test with limit of 0
3201            TestOptions::new(4, 100, 25).with_limit(0),
3202            // Test with limit of 50
3203            TestOptions::new(4, 100, 25).with_limit(50),
3204            // Test with limit equal to number of rows
3205            TestOptions::new(4, 100, 25).with_limit(10),
3206            // Test with limit larger than number of rows
3207            TestOptions::new(4, 100, 25).with_limit(101),
3208            // Test with limit + offset equal to number of rows
3209            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3210            // Test with limit + offset equal to number of rows
3211            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3212            // Test with limit + offset larger than number of rows
3213            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3214            // Test with no page-level statistics
3215            TestOptions::new(2, 256, 91)
3216                .with_null_percent(25)
3217                .with_enabled_statistics(EnabledStatistics::Chunk),
3218            // Test with no statistics
3219            TestOptions::new(2, 256, 91)
3220                .with_null_percent(25)
3221                .with_enabled_statistics(EnabledStatistics::None),
3222            // Test with all null
3223            TestOptions::new(2, 128, 91)
3224                .with_null_percent(100)
3225                .with_enabled_statistics(EnabledStatistics::None),
3226            // Test skip
3227
3228            // choose record_batch_batch (15) so batches cross row
3229            // group boundaries (50 rows in 2 row groups) cases.
3230            TestOptions::new(2, 100, 15).with_row_selections(),
3231            // choose record_batch_batch (5) so batches sometime fall
3232            // on row group boundaries and (25 rows in 3 row groups
3233            // --> row groups of 10, 10, and 5). Tests buffer
3234            // refilling edge cases.
3235            TestOptions::new(3, 25, 5).with_row_selections(),
3236            // Choose record_batch_size (25) so all batches fall
3237            // exactly on row group boundary (25). Tests buffer
3238            // refilling edge cases.
3239            TestOptions::new(4, 100, 25).with_row_selections(),
3240            // Set maximum page size so row groups have multiple pages
3241            TestOptions::new(3, 256, 73)
3242                .with_max_data_page_size(128)
3243                .with_row_selections(),
3244            // Set small dictionary page size to test dictionary fallback
3245            TestOptions::new(3, 256, 57)
3246                .with_max_dict_page_size(128)
3247                .with_row_selections(),
3248            // Test optional but with no nulls
3249            TestOptions::new(2, 256, 127)
3250                .with_null_percent(0)
3251                .with_row_selections(),
3252            // Test optional with nulls
3253            TestOptions::new(2, 256, 93)
3254                .with_null_percent(25)
3255                .with_row_selections(),
3256            // Test optional with nulls
3257            TestOptions::new(2, 256, 93)
3258                .with_null_percent(25)
3259                .with_row_selections()
3260                .with_limit(10),
3261            // Test optional with nulls
3262            TestOptions::new(2, 256, 93)
3263                .with_null_percent(25)
3264                .with_row_selections()
3265                .with_offset(20)
3266                .with_limit(10),
3267            // Test filter
3268
3269            // Test with row filter
3270            TestOptions::new(4, 100, 25).with_row_filter(),
3271            // Test with row selection and row filter
3272            TestOptions::new(4, 100, 25)
3273                .with_row_selections()
3274                .with_row_filter(),
3275            // Test with nulls and row filter
3276            TestOptions::new(2, 256, 93)
3277                .with_null_percent(25)
3278                .with_max_data_page_size(10)
3279                .with_row_filter(),
3280            // Test with nulls and row filter and small pages
3281            TestOptions::new(2, 256, 93)
3282                .with_null_percent(25)
3283                .with_max_data_page_size(10)
3284                .with_row_selections()
3285                .with_row_filter(),
3286            // Test with row selection and no offset index and small pages
3287            TestOptions::new(2, 256, 93)
3288                .with_enabled_statistics(EnabledStatistics::None)
3289                .with_max_data_page_size(10)
3290                .with_row_selections(),
3291        ];
3292
3293        all_options.into_iter().for_each(|opts| {
3294            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3295                for encoding in encodings {
3296                    let opts = TestOptions {
3297                        writer_version,
3298                        encoding: *encoding,
3299                        ..opts.clone()
3300                    };
3301
3302                    single_column_reader_test::<T, _, G>(
3303                        opts,
3304                        rand_max,
3305                        converted_type,
3306                        arrow_type.clone(),
3307                        &converter,
3308                    )
3309                }
3310            }
3311        });
3312    }
3313
3314    /// Create a parquet file and then read it using
3315    /// `ParquetFileArrowReader` using the parameters described in
3316    /// `opts`.
3317    fn single_column_reader_test<T, F, G>(
3318        opts: TestOptions,
3319        rand_max: i32,
3320        converted_type: ConvertedType,
3321        arrow_type: Option<ArrowDataType>,
3322        converter: F,
3323    ) where
3324        T: DataType,
3325        G: RandGen<T>,
3326        F: Fn(&[Option<T::T>]) -> ArrayRef,
3327    {
3328        // Print out options to facilitate debugging failures on CI
3329        println!(
3330            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3331            T::get_physical_type(),
3332            converted_type,
3333            arrow_type,
3334            opts
3335        );
3336
3337        //according to null_percent generate def_levels
3338        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3339            Some(null_percent) => {
3340                let mut rng = rng();
3341
3342                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3343                    .map(|_| {
3344                        std::iter::from_fn(|| {
3345                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3346                        })
3347                        .take(opts.num_rows)
3348                        .collect()
3349                    })
3350                    .collect();
3351                (Repetition::OPTIONAL, Some(def_levels))
3352            }
3353            None => (Repetition::REQUIRED, None),
3354        };
3355
3356        //generate random table data
3357        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3358            .map(|idx| {
3359                let null_count = match def_levels.as_ref() {
3360                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3361                    None => 0,
3362                };
3363                G::gen_vec(rand_max, opts.num_rows - null_count)
3364            })
3365            .collect();
3366
3367        let len = match T::get_physical_type() {
3368            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3369            crate::basic::Type::INT96 => 12,
3370            _ => -1,
3371        };
3372
3373        let fields = vec![Arc::new(
3374            Type::primitive_type_builder("leaf", T::get_physical_type())
3375                .with_repetition(repetition)
3376                .with_converted_type(converted_type)
3377                .with_length(len)
3378                .build()
3379                .unwrap(),
3380        )];
3381
3382        let schema = Arc::new(
3383            Type::group_type_builder("test_schema")
3384                .with_fields(fields)
3385                .build()
3386                .unwrap(),
3387        );
3388
3389        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3390
3391        let mut file = tempfile::tempfile().unwrap();
3392
3393        generate_single_column_file_with_data::<T>(
3394            &values,
3395            def_levels.as_ref(),
3396            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3397            schema,
3398            arrow_field,
3399            &opts,
3400        )
3401        .unwrap();
3402
3403        file.rewind().unwrap();
3404
3405        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3406            opts.enabled_statistics == EnabledStatistics::Page,
3407        ));
3408
3409        let mut builder =
3410            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3411
3412        let expected_data = match opts.row_selections {
3413            Some((selections, row_count)) => {
3414                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3415
3416                let mut skip_data: Vec<Option<T::T>> = vec![];
3417                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3418                for select in dequeue {
3419                    if select.skip {
3420                        without_skip_data.drain(0..select.row_count);
3421                    } else {
3422                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3423                    }
3424                }
3425                builder = builder.with_row_selection(selections);
3426
3427                assert_eq!(skip_data.len(), row_count);
3428                skip_data
3429            }
3430            None => {
3431                //get flatten table data
3432                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3433                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3434                expected_data
3435            }
3436        };
3437
3438        let mut expected_data = match opts.row_filter {
3439            Some(filter) => {
3440                let expected_data = expected_data
3441                    .into_iter()
3442                    .zip(filter.iter())
3443                    .filter_map(|(d, f)| f.then(|| d))
3444                    .collect();
3445
3446                let mut filter_offset = 0;
3447                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3448                    ProjectionMask::all(),
3449                    move |b| {
3450                        let array = BooleanArray::from_iter(
3451                            filter
3452                                .iter()
3453                                .skip(filter_offset)
3454                                .take(b.num_rows())
3455                                .map(|x| Some(*x)),
3456                        );
3457                        filter_offset += b.num_rows();
3458                        Ok(array)
3459                    },
3460                ))]);
3461
3462                builder = builder.with_row_filter(filter);
3463                expected_data
3464            }
3465            None => expected_data,
3466        };
3467
3468        if let Some(offset) = opts.offset {
3469            builder = builder.with_offset(offset);
3470            expected_data = expected_data.into_iter().skip(offset).collect();
3471        }
3472
3473        if let Some(limit) = opts.limit {
3474            builder = builder.with_limit(limit);
3475            expected_data = expected_data.into_iter().take(limit).collect();
3476        }
3477
3478        let mut record_reader = builder
3479            .with_batch_size(opts.record_batch_size)
3480            .build()
3481            .unwrap();
3482
3483        let mut total_read = 0;
3484        loop {
3485            let maybe_batch = record_reader.next();
3486            if total_read < expected_data.len() {
3487                let end = min(total_read + opts.record_batch_size, expected_data.len());
3488                let batch = maybe_batch.unwrap().unwrap();
3489                assert_eq!(end - total_read, batch.num_rows());
3490
3491                let a = converter(&expected_data[total_read..end]);
3492                let b = batch.column(0);
3493
3494                assert_eq!(a.data_type(), b.data_type());
3495                assert_eq!(a.to_data(), b.to_data());
3496                assert_eq!(
3497                    a.as_any().type_id(),
3498                    b.as_any().type_id(),
3499                    "incorrect type ids"
3500                );
3501
3502                total_read = end;
3503            } else {
3504                assert!(maybe_batch.is_none());
3505                break;
3506            }
3507        }
3508    }
3509
3510    fn gen_expected_data<T: DataType>(
3511        def_levels: Option<&Vec<Vec<i16>>>,
3512        values: &[Vec<T::T>],
3513    ) -> Vec<Option<T::T>> {
3514        let data: Vec<Option<T::T>> = match def_levels {
3515            Some(levels) => {
3516                let mut values_iter = values.iter().flatten();
3517                levels
3518                    .iter()
3519                    .flatten()
3520                    .map(|d| match d {
3521                        1 => Some(values_iter.next().cloned().unwrap()),
3522                        0 => None,
3523                        _ => unreachable!(),
3524                    })
3525                    .collect()
3526            }
3527            None => values.iter().flatten().cloned().map(Some).collect(),
3528        };
3529        data
3530    }
3531
3532    fn generate_single_column_file_with_data<T: DataType>(
3533        values: &[Vec<T::T>],
3534        def_levels: Option<&Vec<Vec<i16>>>,
3535        file: File,
3536        schema: TypePtr,
3537        field: Option<Field>,
3538        opts: &TestOptions,
3539    ) -> Result<ParquetMetaData> {
3540        let mut writer_props = opts.writer_props();
3541        if let Some(field) = field {
3542            let arrow_schema = Schema::new(vec![field]);
3543            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3544        }
3545
3546        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3547
3548        for (idx, v) in values.iter().enumerate() {
3549            let def_levels = def_levels.map(|d| d[idx].as_slice());
3550            let mut row_group_writer = writer.next_row_group()?;
3551            {
3552                let mut column_writer = row_group_writer
3553                    .next_column()?
3554                    .expect("Column writer is none!");
3555
3556                column_writer
3557                    .typed::<T>()
3558                    .write_batch(v, def_levels, None)?;
3559
3560                column_writer.close()?;
3561            }
3562            row_group_writer.close()?;
3563        }
3564
3565        writer.close()
3566    }
3567
3568    fn get_test_file(file_name: &str) -> File {
3569        let mut path = PathBuf::new();
3570        path.push(arrow::util::test_util::arrow_test_data());
3571        path.push(file_name);
3572
3573        File::open(path.as_path()).expect("File not found!")
3574    }
3575
3576    #[test]
3577    fn test_read_structs() {
3578        // This particular test file has columns of struct types where there is
3579        // a column that has the same name as one of the struct fields
3580        // (see: ARROW-11452)
3581        let testdata = arrow::util::test_util::parquet_test_data();
3582        let path = format!("{testdata}/nested_structs.rust.parquet");
3583        let file = File::open(&path).unwrap();
3584        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3585
3586        for batch in record_batch_reader {
3587            batch.unwrap();
3588        }
3589
3590        let file = File::open(&path).unwrap();
3591        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3592
3593        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3594        let projected_reader = builder
3595            .with_projection(mask)
3596            .with_batch_size(60)
3597            .build()
3598            .unwrap();
3599
3600        let expected_schema = Schema::new(vec![
3601            Field::new(
3602                "roll_num",
3603                ArrowDataType::Struct(Fields::from(vec![Field::new(
3604                    "count",
3605                    ArrowDataType::UInt64,
3606                    false,
3607                )])),
3608                false,
3609            ),
3610            Field::new(
3611                "PC_CUR",
3612                ArrowDataType::Struct(Fields::from(vec![
3613                    Field::new("mean", ArrowDataType::Int64, false),
3614                    Field::new("sum", ArrowDataType::Int64, false),
3615                ])),
3616                false,
3617            ),
3618        ]);
3619
3620        // Tests for #1652 and #1654
3621        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3622
3623        for batch in projected_reader {
3624            let batch = batch.unwrap();
3625            assert_eq!(batch.schema().as_ref(), &expected_schema);
3626        }
3627    }
3628
3629    #[test]
3630    // same as test_read_structs but constructs projection mask via column names
3631    fn test_read_structs_by_name() {
3632        let testdata = arrow::util::test_util::parquet_test_data();
3633        let path = format!("{testdata}/nested_structs.rust.parquet");
3634        let file = File::open(&path).unwrap();
3635        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3636
3637        for batch in record_batch_reader {
3638            batch.unwrap();
3639        }
3640
3641        let file = File::open(&path).unwrap();
3642        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3643
3644        let mask = ProjectionMask::columns(
3645            builder.parquet_schema(),
3646            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3647        );
3648        let projected_reader = builder
3649            .with_projection(mask)
3650            .with_batch_size(60)
3651            .build()
3652            .unwrap();
3653
3654        let expected_schema = Schema::new(vec![
3655            Field::new(
3656                "roll_num",
3657                ArrowDataType::Struct(Fields::from(vec![Field::new(
3658                    "count",
3659                    ArrowDataType::UInt64,
3660                    false,
3661                )])),
3662                false,
3663            ),
3664            Field::new(
3665                "PC_CUR",
3666                ArrowDataType::Struct(Fields::from(vec![
3667                    Field::new("mean", ArrowDataType::Int64, false),
3668                    Field::new("sum", ArrowDataType::Int64, false),
3669                ])),
3670                false,
3671            ),
3672        ]);
3673
3674        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3675
3676        for batch in projected_reader {
3677            let batch = batch.unwrap();
3678            assert_eq!(batch.schema().as_ref(), &expected_schema);
3679        }
3680    }
3681
3682    #[test]
3683    fn test_read_maps() {
3684        let testdata = arrow::util::test_util::parquet_test_data();
3685        let path = format!("{testdata}/nested_maps.snappy.parquet");
3686        let file = File::open(path).unwrap();
3687        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3688
3689        for batch in record_batch_reader {
3690            batch.unwrap();
3691        }
3692    }
3693
3694    // test that we can handle the UNKNOWN logical type annotation on any physical type
3695    #[test]
3696    fn test_unknown_logical_type() {
3697        let message_type = "message uk {
3698            OPTIONAL INT32 uki32 (UNKNOWN);
3699            OPTIONAL INT64 uki64 (UNKNOWN);
3700            OPTIONAL INT96 uki96 (UNKNOWN);
3701            OPTIONAL BOOLEAN ukbool (UNKNOWN);
3702            OPTIONAL FLOAT ukfloat (UNKNOWN);
3703            OPTIONAL DOUBLE ukdbl (UNKNOWN);
3704            OPTIONAL BYTE_ARRAY ukbytes (UNKNOWN);
3705            OPTIONAL FIXED_LEN_BYTE_ARRAY(10) ukflba (UNKNOWN);
3706        }";
3707
3708        let schema = Arc::new(parse_message_type(message_type).unwrap());
3709        let file = tempfile::tempfile().unwrap();
3710
3711        let mut writer =
3712            SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3713                .unwrap();
3714
3715        let mut row_group_writer = writer.next_row_group().unwrap();
3716
3717        fn write_nulls<T: DataType>(row_group_writer: &mut SerializedRowGroupWriter<'_, File>) {
3718            let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3719            // write out a bunch of nulls
3720            column_writer
3721                .typed::<T>()
3722                .write_batch(&[], Some(&[0, 0, 0, 0]), None)
3723                .unwrap();
3724            column_writer.close().unwrap();
3725        }
3726
3727        // INT32
3728        write_nulls::<Int32Type>(&mut row_group_writer);
3729
3730        // INT64
3731        write_nulls::<Int64Type>(&mut row_group_writer);
3732
3733        // INT96
3734        write_nulls::<Int96Type>(&mut row_group_writer);
3735
3736        // BOOLEAN
3737        write_nulls::<BoolType>(&mut row_group_writer);
3738
3739        // FLOAT
3740        write_nulls::<FloatType>(&mut row_group_writer);
3741
3742        // DOUBLE
3743        write_nulls::<DoubleType>(&mut row_group_writer);
3744
3745        // BYTE_ARRAY
3746        write_nulls::<ByteArrayType>(&mut row_group_writer);
3747
3748        // FIXED_LEN_BYTE_ARRAY
3749        write_nulls::<FixedLenByteArrayType>(&mut row_group_writer);
3750
3751        row_group_writer.close().unwrap();
3752
3753        writer.close().unwrap();
3754
3755        let mut reader = ParquetRecordBatchReader::try_new(file, 4).unwrap();
3756        let batch = reader.next().unwrap().unwrap();
3757
3758        for col in batch.columns() {
3759            assert_eq!(col.len(), 4);
3760            assert_eq!(col.logical_null_count(), 4);
3761            assert_eq!(*col.data_type(), ArrowDataType::Null);
3762        }
3763    }
3764
3765    #[test]
3766    fn test_nested_nullability() {
3767        let message_type = "message nested {
3768          OPTIONAL Group group {
3769            REQUIRED INT32 leaf;
3770          }
3771        }";
3772
3773        let file = tempfile::tempfile().unwrap();
3774        let schema = Arc::new(parse_message_type(message_type).unwrap());
3775
3776        {
3777            // Write using low-level parquet API (#1167)
3778            let mut writer =
3779                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3780                    .unwrap();
3781
3782            {
3783                let mut row_group_writer = writer.next_row_group().unwrap();
3784                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3785
3786                column_writer
3787                    .typed::<Int32Type>()
3788                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3789                    .unwrap();
3790
3791                column_writer.close().unwrap();
3792                row_group_writer.close().unwrap();
3793            }
3794
3795            writer.close().unwrap();
3796        }
3797
3798        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3799        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3800
3801        let reader = builder.with_projection(mask).build().unwrap();
3802
3803        let expected_schema = Schema::new(vec![Field::new(
3804            "group",
3805            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3806            true,
3807        )]);
3808
3809        let batch = reader.into_iter().next().unwrap().unwrap();
3810        assert_eq!(batch.schema().as_ref(), &expected_schema);
3811        assert_eq!(batch.num_rows(), 4);
3812        assert_eq!(batch.column(0).null_count(), 2);
3813    }
3814
3815    #[test]
3816    fn test_dictionary_preservation() {
3817        let fields = vec![Arc::new(
3818            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3819                .with_repetition(Repetition::OPTIONAL)
3820                .with_converted_type(ConvertedType::UTF8)
3821                .build()
3822                .unwrap(),
3823        )];
3824
3825        let schema = Arc::new(
3826            Type::group_type_builder("test_schema")
3827                .with_fields(fields)
3828                .build()
3829                .unwrap(),
3830        );
3831
3832        let dict_type = ArrowDataType::Dictionary(
3833            Box::new(ArrowDataType::Int32),
3834            Box::new(ArrowDataType::Utf8),
3835        );
3836
3837        let arrow_field = Field::new("leaf", dict_type, true);
3838
3839        let mut file = tempfile::tempfile().unwrap();
3840
3841        let values = vec![
3842            vec![
3843                ByteArray::from("hello"),
3844                ByteArray::from("a"),
3845                ByteArray::from("b"),
3846                ByteArray::from("d"),
3847            ],
3848            vec![
3849                ByteArray::from("c"),
3850                ByteArray::from("a"),
3851                ByteArray::from("b"),
3852            ],
3853        ];
3854
3855        let def_levels = vec![
3856            vec![1, 0, 0, 1, 0, 0, 1, 1],
3857            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3858        ];
3859
3860        let opts = TestOptions {
3861            encoding: Encoding::RLE_DICTIONARY,
3862            ..Default::default()
3863        };
3864
3865        generate_single_column_file_with_data::<ByteArrayType>(
3866            &values,
3867            Some(&def_levels),
3868            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3869            schema,
3870            Some(arrow_field),
3871            &opts,
3872        )
3873        .unwrap();
3874
3875        file.rewind().unwrap();
3876
3877        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3878
3879        let batches = record_reader
3880            .collect::<Result<Vec<RecordBatch>, _>>()
3881            .unwrap();
3882
3883        assert_eq!(batches.len(), 6);
3884        assert!(batches.iter().all(|x| x.num_columns() == 1));
3885
3886        let row_counts = batches
3887            .iter()
3888            .map(|x| (x.num_rows(), x.column(0).null_count()))
3889            .collect::<Vec<_>>();
3890
3891        assert_eq!(
3892            row_counts,
3893            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3894        );
3895
3896        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3897
3898        // First and second batch in same row group -> same dictionary
3899        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3900        // Third batch spans row group -> computed dictionary
3901        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3902        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3903        // Fourth, fifth and sixth from same row group -> same dictionary
3904        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3905        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3906    }
3907
3908    #[test]
3909    fn test_read_null_list() {
3910        let testdata = arrow::util::test_util::parquet_test_data();
3911        let path = format!("{testdata}/null_list.parquet");
3912        let file = File::open(path).unwrap();
3913        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3914
3915        let batch = record_batch_reader.next().unwrap().unwrap();
3916        assert_eq!(batch.num_rows(), 1);
3917        assert_eq!(batch.num_columns(), 1);
3918        assert_eq!(batch.column(0).len(), 1);
3919
3920        let list = batch
3921            .column(0)
3922            .as_any()
3923            .downcast_ref::<ListArray>()
3924            .unwrap();
3925        assert_eq!(list.len(), 1);
3926        assert!(list.is_valid(0));
3927
3928        let val = list.value(0);
3929        assert_eq!(val.len(), 0);
3930    }
3931
3932    #[test]
3933    fn test_null_schema_inference() {
3934        let testdata = arrow::util::test_util::parquet_test_data();
3935        let path = format!("{testdata}/null_list.parquet");
3936        let file = File::open(path).unwrap();
3937
3938        let arrow_field = Field::new(
3939            "emptylist",
3940            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3941            true,
3942        );
3943
3944        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3945        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3946        let schema = builder.schema();
3947        assert_eq!(schema.fields().len(), 1);
3948        assert_eq!(schema.field(0), &arrow_field);
3949    }
3950
3951    #[test]
3952    fn test_skip_metadata() {
3953        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3954        let field = Field::new("col", col.data_type().clone(), true);
3955
3956        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3957
3958        let metadata = [("key".to_string(), "value".to_string())]
3959            .into_iter()
3960            .collect();
3961
3962        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3963
3964        assert_ne!(schema_with_metadata, schema_without_metadata);
3965
3966        let batch =
3967            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3968
3969        let file = |version: WriterVersion| {
3970            let props = WriterProperties::builder()
3971                .set_writer_version(version)
3972                .build();
3973
3974            let file = tempfile().unwrap();
3975            let mut writer =
3976                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3977                    .unwrap();
3978            writer.write(&batch).unwrap();
3979            writer.close().unwrap();
3980            file
3981        };
3982
3983        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3984
3985        let v1_reader = file(WriterVersion::PARQUET_1_0);
3986        let v2_reader = file(WriterVersion::PARQUET_2_0);
3987
3988        let arrow_reader =
3989            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3990        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3991
3992        let reader =
3993            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3994                .unwrap()
3995                .build()
3996                .unwrap();
3997        assert_eq!(reader.schema(), schema_without_metadata);
3998
3999        let arrow_reader =
4000            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4001        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4002
4003        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4004            .unwrap()
4005            .build()
4006            .unwrap();
4007        assert_eq!(reader.schema(), schema_without_metadata);
4008    }
4009
4010    fn write_parquet_from_iter<I, F>(value: I) -> File
4011    where
4012        I: IntoIterator<Item = (F, ArrayRef)>,
4013        F: AsRef<str>,
4014    {
4015        let batch = RecordBatch::try_from_iter(value).unwrap();
4016        let file = tempfile().unwrap();
4017        let mut writer =
4018            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4019        writer.write(&batch).unwrap();
4020        writer.close().unwrap();
4021        file
4022    }
4023
4024    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4025    where
4026        I: IntoIterator<Item = (F, ArrayRef)>,
4027        F: AsRef<str>,
4028    {
4029        let file = write_parquet_from_iter(value);
4030        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4031        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4032            file.try_clone().unwrap(),
4033            options_with_schema,
4034        );
4035        assert_eq!(builder.err().unwrap().to_string(), expected_error);
4036    }
4037
4038    #[test]
4039    fn test_schema_too_few_columns() {
4040        run_schema_test_with_error(
4041            vec![
4042                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4043                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4044            ],
4045            Arc::new(Schema::new(vec![Field::new(
4046                "int64",
4047                ArrowDataType::Int64,
4048                false,
4049            )])),
4050            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4051        );
4052    }
4053
4054    #[test]
4055    fn test_schema_too_many_columns() {
4056        run_schema_test_with_error(
4057            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4058            Arc::new(Schema::new(vec![
4059                Field::new("int64", ArrowDataType::Int64, false),
4060                Field::new("int32", ArrowDataType::Int32, false),
4061            ])),
4062            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4063        );
4064    }
4065
4066    #[test]
4067    fn test_schema_mismatched_column_names() {
4068        run_schema_test_with_error(
4069            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4070            Arc::new(Schema::new(vec![Field::new(
4071                "other",
4072                ArrowDataType::Int64,
4073                false,
4074            )])),
4075            "Arrow: incompatible arrow schema, expected field named int64 got other",
4076        );
4077    }
4078
4079    #[test]
4080    fn test_schema_incompatible_columns() {
4081        run_schema_test_with_error(
4082            vec![
4083                (
4084                    "col1_invalid",
4085                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4086                ),
4087                (
4088                    "col2_valid",
4089                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4090                ),
4091                (
4092                    "col3_invalid",
4093                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4094                ),
4095            ],
4096            Arc::new(Schema::new(vec![
4097                Field::new("col1_invalid", ArrowDataType::Int32, false),
4098                Field::new("col2_valid", ArrowDataType::Int32, false),
4099                Field::new("col3_invalid", ArrowDataType::Int32, false),
4100            ])),
4101            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4102        );
4103    }
4104
4105    #[test]
4106    fn test_one_incompatible_nested_column() {
4107        let nested_fields = Fields::from(vec![
4108            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4109            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4110        ]);
4111        let nested = StructArray::try_new(
4112            nested_fields,
4113            vec![
4114                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4115                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4116            ],
4117            None,
4118        )
4119        .expect("struct array");
4120        let supplied_nested_fields = Fields::from(vec![
4121            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4122            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4123        ]);
4124        run_schema_test_with_error(
4125            vec![
4126                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4127                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4128                ("nested", Arc::new(nested) as ArrayRef),
4129            ],
4130            Arc::new(Schema::new(vec![
4131                Field::new("col1", ArrowDataType::Int64, false),
4132                Field::new("col2", ArrowDataType::Int32, false),
4133                Field::new(
4134                    "nested",
4135                    ArrowDataType::Struct(supplied_nested_fields),
4136                    false,
4137                ),
4138            ])),
4139            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4140            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4141            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4142        );
4143    }
4144
4145    /// Return parquet data with a single column of utf8 strings
4146    fn utf8_parquet() -> Bytes {
4147        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4148        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4149        let props = None;
4150        // write parquet file with non nullable strings
4151        let mut parquet_data = vec![];
4152        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4153        writer.write(&batch).unwrap();
4154        writer.close().unwrap();
4155        Bytes::from(parquet_data)
4156    }
4157
4158    #[test]
4159    fn test_schema_error_bad_types() {
4160        // verify incompatible schemas error on read
4161        let parquet_data = utf8_parquet();
4162
4163        // Ask to read it back with an incompatible schema (int vs string)
4164        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4165            "column1",
4166            arrow::datatypes::DataType::Int32,
4167            false,
4168        )]));
4169
4170        // read it back out
4171        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4172        let err =
4173            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4174                .unwrap_err();
4175        assert_eq!(
4176            err.to_string(),
4177            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4178        )
4179    }
4180
4181    #[test]
4182    fn test_schema_error_bad_nullability() {
4183        // verify incompatible schemas error on read
4184        let parquet_data = utf8_parquet();
4185
4186        // Ask to read it back with an incompatible schema (nullability mismatch)
4187        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4188            "column1",
4189            arrow::datatypes::DataType::Utf8,
4190            true,
4191        )]));
4192
4193        // read it back out
4194        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4195        let err =
4196            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4197                .unwrap_err();
4198        assert_eq!(
4199            err.to_string(),
4200            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4201        )
4202    }
4203
4204    #[test]
4205    fn test_read_binary_as_utf8() {
4206        let file = write_parquet_from_iter(vec![
4207            (
4208                "binary_to_utf8",
4209                Arc::new(BinaryArray::from(vec![
4210                    b"one".as_ref(),
4211                    b"two".as_ref(),
4212                    b"three".as_ref(),
4213                ])) as ArrayRef,
4214            ),
4215            (
4216                "large_binary_to_large_utf8",
4217                Arc::new(LargeBinaryArray::from(vec![
4218                    b"one".as_ref(),
4219                    b"two".as_ref(),
4220                    b"three".as_ref(),
4221                ])) as ArrayRef,
4222            ),
4223            (
4224                "binary_view_to_utf8_view",
4225                Arc::new(BinaryViewArray::from(vec![
4226                    b"one".as_ref(),
4227                    b"two".as_ref(),
4228                    b"three".as_ref(),
4229                ])) as ArrayRef,
4230            ),
4231        ]);
4232        let supplied_fields = Fields::from(vec![
4233            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4234            Field::new(
4235                "large_binary_to_large_utf8",
4236                ArrowDataType::LargeUtf8,
4237                false,
4238            ),
4239            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4240        ]);
4241
4242        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4243        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4244            file.try_clone().unwrap(),
4245            options,
4246        )
4247        .expect("reader builder with schema")
4248        .build()
4249        .expect("reader with schema");
4250
4251        let batch = arrow_reader.next().unwrap().unwrap();
4252        assert_eq!(batch.num_columns(), 3);
4253        assert_eq!(batch.num_rows(), 3);
4254        assert_eq!(
4255            batch
4256                .column(0)
4257                .as_string::<i32>()
4258                .iter()
4259                .collect::<Vec<_>>(),
4260            vec![Some("one"), Some("two"), Some("three")]
4261        );
4262
4263        assert_eq!(
4264            batch
4265                .column(1)
4266                .as_string::<i64>()
4267                .iter()
4268                .collect::<Vec<_>>(),
4269            vec![Some("one"), Some("two"), Some("three")]
4270        );
4271
4272        assert_eq!(
4273            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4274            vec![Some("one"), Some("two"), Some("three")]
4275        );
4276    }
4277
4278    #[test]
4279    #[should_panic(expected = "Invalid UTF8 sequence at")]
4280    fn test_read_non_utf8_binary_as_utf8() {
4281        let file = write_parquet_from_iter(vec![(
4282            "non_utf8_binary",
4283            Arc::new(BinaryArray::from(vec![
4284                b"\xDE\x00\xFF".as_ref(),
4285                b"\xDE\x01\xAA".as_ref(),
4286                b"\xDE\x02\xFF".as_ref(),
4287            ])) as ArrayRef,
4288        )]);
4289        let supplied_fields = Fields::from(vec![Field::new(
4290            "non_utf8_binary",
4291            ArrowDataType::Utf8,
4292            false,
4293        )]);
4294
4295        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4296        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4297            file.try_clone().unwrap(),
4298            options,
4299        )
4300        .expect("reader builder with schema")
4301        .build()
4302        .expect("reader with schema");
4303        arrow_reader.next().unwrap().unwrap_err();
4304    }
4305
4306    #[test]
4307    fn test_with_schema() {
4308        let nested_fields = Fields::from(vec![
4309            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4310            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4311        ]);
4312
4313        let nested_arrays: Vec<ArrayRef> = vec![
4314            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4315            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4316        ];
4317
4318        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4319
4320        let file = write_parquet_from_iter(vec![
4321            (
4322                "int32_to_ts_second",
4323                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4324            ),
4325            (
4326                "date32_to_date64",
4327                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4328            ),
4329            ("nested", Arc::new(nested) as ArrayRef),
4330        ]);
4331
4332        let supplied_nested_fields = Fields::from(vec![
4333            Field::new(
4334                "utf8_to_dict",
4335                ArrowDataType::Dictionary(
4336                    Box::new(ArrowDataType::Int32),
4337                    Box::new(ArrowDataType::Utf8),
4338                ),
4339                false,
4340            ),
4341            Field::new(
4342                "int64_to_ts_nano",
4343                ArrowDataType::Timestamp(
4344                    arrow::datatypes::TimeUnit::Nanosecond,
4345                    Some("+10:00".into()),
4346                ),
4347                false,
4348            ),
4349        ]);
4350
4351        let supplied_schema = Arc::new(Schema::new(vec![
4352            Field::new(
4353                "int32_to_ts_second",
4354                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4355                false,
4356            ),
4357            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4358            Field::new(
4359                "nested",
4360                ArrowDataType::Struct(supplied_nested_fields),
4361                false,
4362            ),
4363        ]));
4364
4365        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4366        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4367            file.try_clone().unwrap(),
4368            options,
4369        )
4370        .expect("reader builder with schema")
4371        .build()
4372        .expect("reader with schema");
4373
4374        assert_eq!(arrow_reader.schema(), supplied_schema);
4375        let batch = arrow_reader.next().unwrap().unwrap();
4376        assert_eq!(batch.num_columns(), 3);
4377        assert_eq!(batch.num_rows(), 4);
4378        assert_eq!(
4379            batch
4380                .column(0)
4381                .as_any()
4382                .downcast_ref::<TimestampSecondArray>()
4383                .expect("downcast to timestamp second")
4384                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4385                .map(|v| v.to_string())
4386                .expect("value as datetime"),
4387            "1970-01-01 01:00:00 +01:00"
4388        );
4389        assert_eq!(
4390            batch
4391                .column(1)
4392                .as_any()
4393                .downcast_ref::<Date64Array>()
4394                .expect("downcast to date64")
4395                .value_as_date(0)
4396                .map(|v| v.to_string())
4397                .expect("value as date"),
4398            "1970-01-01"
4399        );
4400
4401        let nested = batch
4402            .column(2)
4403            .as_any()
4404            .downcast_ref::<StructArray>()
4405            .expect("downcast to struct");
4406
4407        let nested_dict = nested
4408            .column(0)
4409            .as_any()
4410            .downcast_ref::<Int32DictionaryArray>()
4411            .expect("downcast to dictionary");
4412
4413        assert_eq!(
4414            nested_dict
4415                .values()
4416                .as_any()
4417                .downcast_ref::<StringArray>()
4418                .expect("downcast to string")
4419                .iter()
4420                .collect::<Vec<_>>(),
4421            vec![Some("a"), Some("b")]
4422        );
4423
4424        assert_eq!(
4425            nested_dict.keys().iter().collect::<Vec<_>>(),
4426            vec![Some(0), Some(0), Some(0), Some(1)]
4427        );
4428
4429        assert_eq!(
4430            nested
4431                .column(1)
4432                .as_any()
4433                .downcast_ref::<TimestampNanosecondArray>()
4434                .expect("downcast to timestamp nanosecond")
4435                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4436                .map(|v| v.to_string())
4437                .expect("value as datetime"),
4438            "1970-01-01 10:00:00.000000001 +10:00"
4439        );
4440    }
4441
4442    #[test]
4443    fn test_empty_projection() {
4444        let testdata = arrow::util::test_util::parquet_test_data();
4445        let path = format!("{testdata}/alltypes_plain.parquet");
4446        let file = File::open(path).unwrap();
4447
4448        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4449        let file_metadata = builder.metadata().file_metadata();
4450        let expected_rows = file_metadata.num_rows() as usize;
4451
4452        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4453        let batch_reader = builder
4454            .with_projection(mask)
4455            .with_batch_size(2)
4456            .build()
4457            .unwrap();
4458
4459        let mut total_rows = 0;
4460        for maybe_batch in batch_reader {
4461            let batch = maybe_batch.unwrap();
4462            total_rows += batch.num_rows();
4463            assert_eq!(batch.num_columns(), 0);
4464            assert!(batch.num_rows() <= 2);
4465        }
4466
4467        assert_eq!(total_rows, expected_rows);
4468    }
4469
4470    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4471        let schema = Arc::new(Schema::new(vec![Field::new(
4472            "list",
4473            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4474            true,
4475        )]));
4476
4477        let mut buf = Vec::with_capacity(1024);
4478
4479        let mut writer = ArrowWriter::try_new(
4480            &mut buf,
4481            schema.clone(),
4482            Some(
4483                WriterProperties::builder()
4484                    .set_max_row_group_row_count(Some(row_group_size))
4485                    .build(),
4486            ),
4487        )
4488        .unwrap();
4489        for _ in 0..2 {
4490            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4491            for _ in 0..(batch_size) {
4492                list_builder.append(true);
4493            }
4494            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4495                .unwrap();
4496            writer.write(&batch).unwrap();
4497        }
4498        writer.close().unwrap();
4499
4500        let mut record_reader =
4501            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4502        assert_eq!(
4503            batch_size,
4504            record_reader.next().unwrap().unwrap().num_rows()
4505        );
4506        assert_eq!(
4507            batch_size,
4508            record_reader.next().unwrap().unwrap().num_rows()
4509        );
4510    }
4511
4512    #[test]
4513    fn test_row_group_exact_multiple() {
4514        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4515        test_row_group_batch(8, 8);
4516        test_row_group_batch(10, 8);
4517        test_row_group_batch(8, 10);
4518        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4519        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4520        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4521        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4522        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4523    }
4524
4525    /// Given a RecordBatch containing all the column data, return the expected batches given
4526    /// a `batch_size` and `selection`
4527    fn get_expected_batches(
4528        column: &RecordBatch,
4529        selection: &RowSelection,
4530        batch_size: usize,
4531    ) -> Vec<RecordBatch> {
4532        let mut expected_batches = vec![];
4533
4534        let mut selection: VecDeque<_> = selection.clone().into();
4535        let mut row_offset = 0;
4536        let mut last_start = None;
4537        while row_offset < column.num_rows() && !selection.is_empty() {
4538            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4539            while batch_remaining > 0 && !selection.is_empty() {
4540                let (to_read, skip) = match selection.front_mut() {
4541                    Some(selection) if selection.row_count > batch_remaining => {
4542                        selection.row_count -= batch_remaining;
4543                        (batch_remaining, selection.skip)
4544                    }
4545                    Some(_) => {
4546                        let select = selection.pop_front().unwrap();
4547                        (select.row_count, select.skip)
4548                    }
4549                    None => break,
4550                };
4551
4552                batch_remaining -= to_read;
4553
4554                match skip {
4555                    true => {
4556                        if let Some(last_start) = last_start.take() {
4557                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4558                        }
4559                        row_offset += to_read
4560                    }
4561                    false => {
4562                        last_start.get_or_insert(row_offset);
4563                        row_offset += to_read
4564                    }
4565                }
4566            }
4567        }
4568
4569        if let Some(last_start) = last_start.take() {
4570            expected_batches.push(column.slice(last_start, row_offset - last_start))
4571        }
4572
4573        // Sanity check, all batches except the final should be the batch size
4574        for batch in &expected_batches[..expected_batches.len() - 1] {
4575            assert_eq!(batch.num_rows(), batch_size);
4576        }
4577
4578        expected_batches
4579    }
4580
4581    fn create_test_selection(
4582        step_len: usize,
4583        total_len: usize,
4584        skip_first: bool,
4585    ) -> (RowSelection, usize) {
4586        let mut remaining = total_len;
4587        let mut skip = skip_first;
4588        let mut vec = vec![];
4589        let mut selected_count = 0;
4590        while remaining != 0 {
4591            let step = if remaining > step_len {
4592                step_len
4593            } else {
4594                remaining
4595            };
4596            vec.push(RowSelector {
4597                row_count: step,
4598                skip,
4599            });
4600            remaining -= step;
4601            if !skip {
4602                selected_count += step;
4603            }
4604            skip = !skip;
4605        }
4606        (vec.into(), selected_count)
4607    }
4608
4609    #[test]
4610    fn test_scan_row_with_selection() {
4611        let testdata = arrow::util::test_util::parquet_test_data();
4612        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4613        let test_file = File::open(&path).unwrap();
4614
4615        let mut serial_reader =
4616            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4617        let data = serial_reader.next().unwrap().unwrap();
4618
4619        let do_test = |batch_size: usize, selection_len: usize| {
4620            for skip_first in [false, true] {
4621                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4622
4623                let expected = get_expected_batches(&data, &selections, batch_size);
4624                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4625                assert_eq!(
4626                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4627                    expected,
4628                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4629                );
4630            }
4631        };
4632
4633        // total row count 7300
4634        // 1. test selection len more than one page row count
4635        do_test(1000, 1000);
4636
4637        // 2. test selection len less than one page row count
4638        do_test(20, 20);
4639
4640        // 3. test selection_len less than batch_size
4641        do_test(20, 5);
4642
4643        // 4. test selection_len more than batch_size
4644        // If batch_size < selection_len
4645        do_test(20, 5);
4646
4647        fn create_skip_reader(
4648            test_file: &File,
4649            batch_size: usize,
4650            selections: RowSelection,
4651        ) -> ParquetRecordBatchReader {
4652            let options =
4653                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4654            let file = test_file.try_clone().unwrap();
4655            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4656                .unwrap()
4657                .with_batch_size(batch_size)
4658                .with_row_selection(selections)
4659                .build()
4660                .unwrap()
4661        }
4662    }
4663
4664    #[test]
4665    fn test_batch_size_overallocate() {
4666        let testdata = arrow::util::test_util::parquet_test_data();
4667        // `alltypes_plain.parquet` only have 8 rows
4668        let path = format!("{testdata}/alltypes_plain.parquet");
4669        let test_file = File::open(path).unwrap();
4670
4671        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4672        let num_rows = builder.metadata.file_metadata().num_rows();
4673        let reader = builder
4674            .with_batch_size(1024)
4675            .with_projection(ProjectionMask::all())
4676            .build()
4677            .unwrap();
4678        assert_ne!(1024, num_rows);
4679        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4680    }
4681
4682    #[test]
4683    fn test_read_with_page_index_enabled() {
4684        let testdata = arrow::util::test_util::parquet_test_data();
4685
4686        {
4687            // `alltypes_tiny_pages.parquet` has page index
4688            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4689            let test_file = File::open(path).unwrap();
4690            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4691                test_file,
4692                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4693            )
4694            .unwrap();
4695            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4696            let reader = builder.build().unwrap();
4697            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4698            assert_eq!(batches.len(), 8);
4699        }
4700
4701        {
4702            // `alltypes_plain.parquet` doesn't have page index
4703            let path = format!("{testdata}/alltypes_plain.parquet");
4704            let test_file = File::open(path).unwrap();
4705            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4706                test_file,
4707                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4708            )
4709            .unwrap();
4710            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4711            // we should read the file successfully.
4712            assert!(builder.metadata().offset_index().is_none());
4713            let reader = builder.build().unwrap();
4714            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4715            assert_eq!(batches.len(), 1);
4716        }
4717    }
4718
4719    #[test]
4720    fn test_raw_repetition() {
4721        const MESSAGE_TYPE: &str = "
4722            message Log {
4723              OPTIONAL INT32 eventType;
4724              REPEATED INT32 category;
4725              REPEATED group filter {
4726                OPTIONAL INT32 error;
4727              }
4728            }
4729        ";
4730        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4731        let props = Default::default();
4732
4733        let mut buf = Vec::with_capacity(1024);
4734        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4735        let mut row_group_writer = writer.next_row_group().unwrap();
4736
4737        // column 0
4738        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4739        col_writer
4740            .typed::<Int32Type>()
4741            .write_batch(&[1], Some(&[1]), None)
4742            .unwrap();
4743        col_writer.close().unwrap();
4744        // column 1
4745        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4746        col_writer
4747            .typed::<Int32Type>()
4748            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4749            .unwrap();
4750        col_writer.close().unwrap();
4751        // column 2
4752        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4753        col_writer
4754            .typed::<Int32Type>()
4755            .write_batch(&[1], Some(&[1]), Some(&[0]))
4756            .unwrap();
4757        col_writer.close().unwrap();
4758
4759        let rg_md = row_group_writer.close().unwrap();
4760        assert_eq!(rg_md.num_rows(), 1);
4761        writer.close().unwrap();
4762
4763        let bytes = Bytes::from(buf);
4764
4765        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4766        let full = no_mask.next().unwrap().unwrap();
4767
4768        assert_eq!(full.num_columns(), 3);
4769
4770        for idx in 0..3 {
4771            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4772            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4773            let mut reader = b.with_projection(mask).build().unwrap();
4774            let projected = reader.next().unwrap().unwrap();
4775
4776            assert_eq!(projected.num_columns(), 1);
4777            assert_eq!(full.column(idx), projected.column(0));
4778        }
4779    }
4780
4781    #[test]
4782    fn test_read_lz4_raw() {
4783        let testdata = arrow::util::test_util::parquet_test_data();
4784        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4785        let file = File::open(path).unwrap();
4786
4787        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4788            .unwrap()
4789            .collect::<Result<Vec<_>, _>>()
4790            .unwrap();
4791        assert_eq!(batches.len(), 1);
4792        let batch = &batches[0];
4793
4794        assert_eq!(batch.num_columns(), 3);
4795        assert_eq!(batch.num_rows(), 4);
4796
4797        // https://github.com/apache/parquet-testing/pull/18
4798        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4799        assert_eq!(
4800            a.values(),
4801            &[1593604800, 1593604800, 1593604801, 1593604801]
4802        );
4803
4804        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4805        let a: Vec<_> = a.iter().flatten().collect();
4806        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4807
4808        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4809        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4810    }
4811
4812    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4813    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4814    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4815    //    LZ4_HADOOP algorithm for compression.
4816    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4817    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4818    //    older parquet-cpp versions.
4819    //
4820    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4821    #[test]
4822    fn test_read_lz4_hadoop_fallback() {
4823        for file in [
4824            "hadoop_lz4_compressed.parquet",
4825            "non_hadoop_lz4_compressed.parquet",
4826        ] {
4827            let testdata = arrow::util::test_util::parquet_test_data();
4828            let path = format!("{testdata}/{file}");
4829            let file = File::open(path).unwrap();
4830            let expected_rows = 4;
4831
4832            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4833                .unwrap()
4834                .collect::<Result<Vec<_>, _>>()
4835                .unwrap();
4836            assert_eq!(batches.len(), 1);
4837            let batch = &batches[0];
4838
4839            assert_eq!(batch.num_columns(), 3);
4840            assert_eq!(batch.num_rows(), expected_rows);
4841
4842            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4843            assert_eq!(
4844                a.values(),
4845                &[1593604800, 1593604800, 1593604801, 1593604801]
4846            );
4847
4848            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4849            let b: Vec<_> = b.iter().flatten().collect();
4850            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4851
4852            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4853            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4854        }
4855    }
4856
4857    #[test]
4858    fn test_read_lz4_hadoop_large() {
4859        let testdata = arrow::util::test_util::parquet_test_data();
4860        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4861        let file = File::open(path).unwrap();
4862        let expected_rows = 10000;
4863
4864        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4865            .unwrap()
4866            .collect::<Result<Vec<_>, _>>()
4867            .unwrap();
4868        assert_eq!(batches.len(), 1);
4869        let batch = &batches[0];
4870
4871        assert_eq!(batch.num_columns(), 1);
4872        assert_eq!(batch.num_rows(), expected_rows);
4873
4874        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4875        let a: Vec<_> = a.iter().flatten().collect();
4876        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4877        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4878        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4879        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4880    }
4881
4882    #[test]
4883    #[cfg(feature = "snap")]
4884    fn test_read_nested_lists() {
4885        let testdata = arrow::util::test_util::parquet_test_data();
4886        let path = format!("{testdata}/nested_lists.snappy.parquet");
4887        let file = File::open(path).unwrap();
4888
4889        let f = file.try_clone().unwrap();
4890        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4891        let expected = reader.next().unwrap().unwrap();
4892        assert_eq!(expected.num_rows(), 3);
4893
4894        let selection = RowSelection::from(vec![
4895            RowSelector::skip(1),
4896            RowSelector::select(1),
4897            RowSelector::skip(1),
4898        ]);
4899        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4900            .unwrap()
4901            .with_row_selection(selection)
4902            .build()
4903            .unwrap();
4904
4905        let actual = reader.next().unwrap().unwrap();
4906        assert_eq!(actual.num_rows(), 1);
4907        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4908    }
4909
4910    #[test]
4911    fn test_arbitrary_decimal() {
4912        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4913        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4914            .with_precision_and_scale(19, 0)
4915            .unwrap();
4916        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4917            .with_precision_and_scale(12, 0)
4918            .unwrap();
4919        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4920            .with_precision_and_scale(17, 10)
4921            .unwrap();
4922
4923        let written = RecordBatch::try_from_iter([
4924            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4925            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4926            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4927        ])
4928        .unwrap();
4929
4930        let mut buffer = Vec::with_capacity(1024);
4931        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4932        writer.write(&written).unwrap();
4933        writer.close().unwrap();
4934
4935        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4936            .unwrap()
4937            .collect::<Result<Vec<_>, _>>()
4938            .unwrap();
4939
4940        assert_eq!(&written.slice(0, 8), &read[0]);
4941    }
4942
4943    #[test]
4944    fn test_list_skip() {
4945        let mut list = ListBuilder::new(Int32Builder::new());
4946        list.append_value([Some(1), Some(2)]);
4947        list.append_value([Some(3)]);
4948        list.append_value([Some(4)]);
4949        let list = list.finish();
4950        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4951
4952        // First page contains 2 values but only 1 row
4953        let props = WriterProperties::builder()
4954            .set_data_page_row_count_limit(1)
4955            .set_write_batch_size(2)
4956            .build();
4957
4958        let mut buffer = Vec::with_capacity(1024);
4959        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4960        writer.write(&batch).unwrap();
4961        writer.close().unwrap();
4962
4963        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4964        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4965            .unwrap()
4966            .with_row_selection(selection.into())
4967            .build()
4968            .unwrap();
4969        let out = reader.next().unwrap().unwrap();
4970        assert_eq!(out.num_rows(), 1);
4971        assert_eq!(out, batch.slice(2, 1));
4972    }
4973
4974    fn test_decimal32_roundtrip() {
4975        let d = |values: Vec<i32>, p: u8| {
4976            let iter = values.into_iter();
4977            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4978                .with_precision_and_scale(p, 2)
4979                .unwrap()
4980        };
4981
4982        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4983        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4984
4985        let mut buffer = Vec::with_capacity(1024);
4986        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4987        writer.write(&batch).unwrap();
4988        writer.close().unwrap();
4989
4990        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4991        let t1 = builder.parquet_schema().columns()[0].physical_type();
4992        assert_eq!(t1, PhysicalType::INT32);
4993
4994        let mut reader = builder.build().unwrap();
4995        assert_eq!(batch.schema(), reader.schema());
4996
4997        let out = reader.next().unwrap().unwrap();
4998        assert_eq!(batch, out);
4999    }
5000
5001    fn test_decimal64_roundtrip() {
5002        // Precision <= 9 -> INT32
5003        // Precision <= 18 -> INT64
5004
5005        let d = |values: Vec<i64>, p: u8| {
5006            let iter = values.into_iter();
5007            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5008                .with_precision_and_scale(p, 2)
5009                .unwrap()
5010        };
5011
5012        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5013        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5014        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5015
5016        let batch = RecordBatch::try_from_iter([
5017            ("d1", Arc::new(d1) as ArrayRef),
5018            ("d2", Arc::new(d2) as ArrayRef),
5019            ("d3", Arc::new(d3) as ArrayRef),
5020        ])
5021        .unwrap();
5022
5023        let mut buffer = Vec::with_capacity(1024);
5024        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5025        writer.write(&batch).unwrap();
5026        writer.close().unwrap();
5027
5028        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5029        let t1 = builder.parquet_schema().columns()[0].physical_type();
5030        assert_eq!(t1, PhysicalType::INT32);
5031        let t2 = builder.parquet_schema().columns()[1].physical_type();
5032        assert_eq!(t2, PhysicalType::INT64);
5033        let t3 = builder.parquet_schema().columns()[2].physical_type();
5034        assert_eq!(t3, PhysicalType::INT64);
5035
5036        let mut reader = builder.build().unwrap();
5037        assert_eq!(batch.schema(), reader.schema());
5038
5039        let out = reader.next().unwrap().unwrap();
5040        assert_eq!(batch, out);
5041    }
5042
5043    fn test_decimal_roundtrip<T: DecimalType>() {
5044        // Precision <= 9 -> INT32
5045        // Precision <= 18 -> INT64
5046        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5047
5048        let d = |values: Vec<usize>, p: u8| {
5049            let iter = values.into_iter().map(T::Native::usize_as);
5050            PrimitiveArray::<T>::from_iter_values(iter)
5051                .with_precision_and_scale(p, 2)
5052                .unwrap()
5053        };
5054
5055        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5056        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5057        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5058        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5059
5060        let batch = RecordBatch::try_from_iter([
5061            ("d1", Arc::new(d1) as ArrayRef),
5062            ("d2", Arc::new(d2) as ArrayRef),
5063            ("d3", Arc::new(d3) as ArrayRef),
5064            ("d4", Arc::new(d4) as ArrayRef),
5065        ])
5066        .unwrap();
5067
5068        let mut buffer = Vec::with_capacity(1024);
5069        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5070        writer.write(&batch).unwrap();
5071        writer.close().unwrap();
5072
5073        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5074        let t1 = builder.parquet_schema().columns()[0].physical_type();
5075        assert_eq!(t1, PhysicalType::INT32);
5076        let t2 = builder.parquet_schema().columns()[1].physical_type();
5077        assert_eq!(t2, PhysicalType::INT64);
5078        let t3 = builder.parquet_schema().columns()[2].physical_type();
5079        assert_eq!(t3, PhysicalType::INT64);
5080        let t4 = builder.parquet_schema().columns()[3].physical_type();
5081        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5082
5083        let mut reader = builder.build().unwrap();
5084        assert_eq!(batch.schema(), reader.schema());
5085
5086        let out = reader.next().unwrap().unwrap();
5087        assert_eq!(batch, out);
5088    }
5089
5090    #[test]
5091    fn test_decimal() {
5092        test_decimal32_roundtrip();
5093        test_decimal64_roundtrip();
5094        test_decimal_roundtrip::<Decimal128Type>();
5095        test_decimal_roundtrip::<Decimal256Type>();
5096    }
5097
5098    #[test]
5099    fn test_list_selection() {
5100        let schema = Arc::new(Schema::new(vec![Field::new_list(
5101            "list",
5102            Field::new_list_field(ArrowDataType::Utf8, true),
5103            false,
5104        )]));
5105        let mut buf = Vec::with_capacity(1024);
5106
5107        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5108
5109        for i in 0..2 {
5110            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5111            for j in 0..1024 {
5112                list_a_builder.values().append_value(format!("{i} {j}"));
5113                list_a_builder.append(true);
5114            }
5115            let batch =
5116                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5117                    .unwrap();
5118            writer.write(&batch).unwrap();
5119        }
5120        let _metadata = writer.close().unwrap();
5121
5122        let buf = Bytes::from(buf);
5123        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5124            .unwrap()
5125            .with_row_selection(RowSelection::from(vec![
5126                RowSelector::skip(100),
5127                RowSelector::select(924),
5128                RowSelector::skip(100),
5129                RowSelector::select(924),
5130            ]))
5131            .build()
5132            .unwrap();
5133
5134        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5135        let batch = concat_batches(&schema, &batches).unwrap();
5136
5137        assert_eq!(batch.num_rows(), 924 * 2);
5138        let list = batch.column(0).as_list::<i32>();
5139
5140        for w in list.value_offsets().windows(2) {
5141            assert_eq!(w[0] + 1, w[1])
5142        }
5143        let mut values = list.values().as_string::<i32>().iter();
5144
5145        for i in 0..2 {
5146            for j in 100..1024 {
5147                let expected = format!("{i} {j}");
5148                assert_eq!(values.next().unwrap().unwrap(), &expected);
5149            }
5150        }
5151    }
5152
5153    #[test]
5154    fn test_list_selection_fuzz() {
5155        let mut rng = rng();
5156        let schema = Arc::new(Schema::new(vec![Field::new_list(
5157            "list",
5158            Field::new_list(
5159                Field::LIST_FIELD_DEFAULT_NAME,
5160                Field::new_list_field(ArrowDataType::Int32, true),
5161                true,
5162            ),
5163            true,
5164        )]));
5165        let mut buf = Vec::with_capacity(1024);
5166        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5167
5168        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5169
5170        for _ in 0..2048 {
5171            if rng.random_bool(0.2) {
5172                list_a_builder.append(false);
5173                continue;
5174            }
5175
5176            let list_a_len = rng.random_range(0..10);
5177            let list_b_builder = list_a_builder.values();
5178
5179            for _ in 0..list_a_len {
5180                if rng.random_bool(0.2) {
5181                    list_b_builder.append(false);
5182                    continue;
5183                }
5184
5185                let list_b_len = rng.random_range(0..10);
5186                let int_builder = list_b_builder.values();
5187                for _ in 0..list_b_len {
5188                    match rng.random_bool(0.2) {
5189                        true => int_builder.append_null(),
5190                        false => int_builder.append_value(rng.random()),
5191                    }
5192                }
5193                list_b_builder.append(true)
5194            }
5195            list_a_builder.append(true);
5196        }
5197
5198        let array = Arc::new(list_a_builder.finish());
5199        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5200
5201        writer.write(&batch).unwrap();
5202        let _metadata = writer.close().unwrap();
5203
5204        let buf = Bytes::from(buf);
5205
5206        let cases = [
5207            vec![
5208                RowSelector::skip(100),
5209                RowSelector::select(924),
5210                RowSelector::skip(100),
5211                RowSelector::select(924),
5212            ],
5213            vec![
5214                RowSelector::select(924),
5215                RowSelector::skip(100),
5216                RowSelector::select(924),
5217                RowSelector::skip(100),
5218            ],
5219            vec![
5220                RowSelector::skip(1023),
5221                RowSelector::select(1),
5222                RowSelector::skip(1023),
5223                RowSelector::select(1),
5224            ],
5225            vec![
5226                RowSelector::select(1),
5227                RowSelector::skip(1023),
5228                RowSelector::select(1),
5229                RowSelector::skip(1023),
5230            ],
5231        ];
5232
5233        for batch_size in [100, 1024, 2048] {
5234            for selection in &cases {
5235                let selection = RowSelection::from(selection.clone());
5236                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5237                    .unwrap()
5238                    .with_row_selection(selection.clone())
5239                    .with_batch_size(batch_size)
5240                    .build()
5241                    .unwrap();
5242
5243                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5244                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5245                assert_eq!(actual.num_rows(), selection.row_count());
5246
5247                let mut batch_offset = 0;
5248                let mut actual_offset = 0;
5249                for selector in selection.iter() {
5250                    if selector.skip {
5251                        batch_offset += selector.row_count;
5252                        continue;
5253                    }
5254
5255                    assert_eq!(
5256                        batch.slice(batch_offset, selector.row_count),
5257                        actual.slice(actual_offset, selector.row_count)
5258                    );
5259
5260                    batch_offset += selector.row_count;
5261                    actual_offset += selector.row_count;
5262                }
5263            }
5264        }
5265    }
5266
5267    #[test]
5268    fn test_read_old_nested_list() {
5269        use arrow::datatypes::DataType;
5270        use arrow::datatypes::ToByteSlice;
5271
5272        let testdata = arrow::util::test_util::parquet_test_data();
5273        // message my_record {
5274        //     REQUIRED group a (LIST) {
5275        //         REPEATED group array (LIST) {
5276        //             REPEATED INT32 array;
5277        //         }
5278        //     }
5279        // }
5280        // should be read as list<list<int32>>
5281        let path = format!("{testdata}/old_list_structure.parquet");
5282        let test_file = File::open(path).unwrap();
5283
5284        // create expected ListArray
5285        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5286
5287        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5288        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5289
5290        // Construct a list array from the above two
5291        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5292            "array",
5293            DataType::Int32,
5294            false,
5295        ))))
5296        .len(2)
5297        .add_buffer(a_value_offsets)
5298        .add_child_data(a_values.into_data())
5299        .build()
5300        .unwrap();
5301        let a = ListArray::from(a_list_data);
5302
5303        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5304        let mut reader = builder.build().unwrap();
5305        let out = reader.next().unwrap().unwrap();
5306        assert_eq!(out.num_rows(), 1);
5307        assert_eq!(out.num_columns(), 1);
5308        // grab first column
5309        let c0 = out.column(0);
5310        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5311        // get first row: [[1, 2], [3, 4]]
5312        let r0 = c0arr.value(0);
5313        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5314        assert_eq!(r0arr, &a);
5315    }
5316
5317    #[test]
5318    fn test_map_no_value() {
5319        // File schema:
5320        // message schema {
5321        //   required group my_map (MAP) {
5322        //     repeated group key_value {
5323        //       required int32 key;
5324        //       optional int32 value;
5325        //     }
5326        //   }
5327        //   required group my_map_no_v (MAP) {
5328        //     repeated group key_value {
5329        //       required int32 key;
5330        //     }
5331        //   }
5332        //   required group my_list (LIST) {
5333        //     repeated group list {
5334        //       required int32 element;
5335        //     }
5336        //   }
5337        // }
5338        let testdata = arrow::util::test_util::parquet_test_data();
5339        let path = format!("{testdata}/map_no_value.parquet");
5340        let file = File::open(path).unwrap();
5341
5342        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5343            .unwrap()
5344            .build()
5345            .unwrap();
5346        let out = reader.next().unwrap().unwrap();
5347        assert_eq!(out.num_rows(), 3);
5348        assert_eq!(out.num_columns(), 3);
5349        // my_map_no_v and my_list columns should now be equivalent
5350        let c0 = out.column(1).as_list::<i32>();
5351        let c1 = out.column(2).as_list::<i32>();
5352        assert_eq!(c0.len(), c1.len());
5353        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5354    }
5355
5356    #[test]
5357    fn test_read_unknown_logical_type() {
5358        let testdata = arrow::util::test_util::parquet_test_data();
5359        let path = format!("{testdata}/unknown-logical-type.parquet");
5360        let test_file = File::open(path).unwrap();
5361
5362        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5363            .expect("Error creating reader builder");
5364
5365        let schema = builder.metadata().file_metadata().schema_descr();
5366        assert_eq!(
5367            schema.column(0).logical_type_ref(),
5368            Some(&LogicalType::String)
5369        );
5370        assert_eq!(
5371            schema.column(1).logical_type_ref(),
5372            Some(&LogicalType::_Unknown { field_id: 2555 })
5373        );
5374        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5375
5376        let mut reader = builder.build().unwrap();
5377        let out = reader.next().unwrap().unwrap();
5378        assert_eq!(out.num_rows(), 3);
5379        assert_eq!(out.num_columns(), 2);
5380    }
5381
5382    #[test]
5383    fn test_read_row_numbers() {
5384        let file = write_parquet_from_iter(vec![(
5385            "value",
5386            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5387        )]);
5388        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5389
5390        let row_number_field = Arc::new(
5391            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5392        );
5393
5394        let options = ArrowReaderOptions::new()
5395            .with_schema(Arc::new(Schema::new(supplied_fields)))
5396            .with_virtual_columns(vec![row_number_field.clone()])
5397            .unwrap();
5398        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5399            file.try_clone().unwrap(),
5400            options,
5401        )
5402        .expect("reader builder with schema")
5403        .build()
5404        .expect("reader with schema");
5405
5406        let batch = arrow_reader.next().unwrap().unwrap();
5407        let schema = Arc::new(Schema::new(vec![
5408            Field::new("value", ArrowDataType::Int64, false),
5409            (*row_number_field).clone(),
5410        ]));
5411
5412        assert_eq!(batch.schema(), schema);
5413        assert_eq!(batch.num_columns(), 2);
5414        assert_eq!(batch.num_rows(), 3);
5415        assert_eq!(
5416            batch
5417                .column(0)
5418                .as_primitive::<types::Int64Type>()
5419                .iter()
5420                .collect::<Vec<_>>(),
5421            vec![Some(1), Some(2), Some(3)]
5422        );
5423        assert_eq!(
5424            batch
5425                .column(1)
5426                .as_primitive::<types::Int64Type>()
5427                .iter()
5428                .collect::<Vec<_>>(),
5429            vec![Some(0), Some(1), Some(2)]
5430        );
5431    }
5432
5433    #[test]
5434    fn test_read_only_row_numbers() {
5435        let file = write_parquet_from_iter(vec![(
5436            "value",
5437            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5438        )]);
5439        let row_number_field = Arc::new(
5440            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5441        );
5442        let options = ArrowReaderOptions::new()
5443            .with_virtual_columns(vec![row_number_field.clone()])
5444            .unwrap();
5445        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5446        let num_columns = metadata
5447            .metadata
5448            .file_metadata()
5449            .schema_descr()
5450            .num_columns();
5451
5452        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5453            .with_projection(ProjectionMask::none(num_columns))
5454            .build()
5455            .expect("reader with schema");
5456
5457        let batch = arrow_reader.next().unwrap().unwrap();
5458        let schema = Arc::new(Schema::new(vec![row_number_field]));
5459
5460        assert_eq!(batch.schema(), schema);
5461        assert_eq!(batch.num_columns(), 1);
5462        assert_eq!(batch.num_rows(), 3);
5463        assert_eq!(
5464            batch
5465                .column(0)
5466                .as_primitive::<types::Int64Type>()
5467                .iter()
5468                .collect::<Vec<_>>(),
5469            vec![Some(0), Some(1), Some(2)]
5470        );
5471    }
5472
5473    #[test]
5474    fn test_read_row_numbers_row_group_order() -> Result<()> {
5475        // Make a parquet file with 100 rows split across 2 row groups
5476        let array = Int64Array::from_iter_values(5000..5100);
5477        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5478        let mut buffer = Vec::new();
5479        let options = WriterProperties::builder()
5480            .set_max_row_group_row_count(Some(50))
5481            .build();
5482        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5483        // write in 10 row batches as the size limits are enforced after each batch
5484        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5485            writer.write(&batch_chunk)?;
5486        }
5487        writer.close()?;
5488
5489        let row_number_field = Arc::new(
5490            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5491        );
5492
5493        let buffer = Bytes::from(buffer);
5494
5495        let options =
5496            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5497
5498        // read out with normal options
5499        let arrow_reader =
5500            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5501                .build()?;
5502
5503        assert_eq!(
5504            ValuesAndRowNumbers {
5505                values: (5000..5100).collect(),
5506                row_numbers: (0..100).collect()
5507            },
5508            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5509        );
5510
5511        // Now read, out of order row groups
5512        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5513            .with_row_groups(vec![1, 0])
5514            .build()?;
5515
5516        assert_eq!(
5517            ValuesAndRowNumbers {
5518                values: (5050..5100).chain(5000..5050).collect(),
5519                row_numbers: (50..100).chain(0..50).collect(),
5520            },
5521            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5522        );
5523
5524        Ok(())
5525    }
5526
5527    #[derive(Debug, PartialEq)]
5528    struct ValuesAndRowNumbers {
5529        values: Vec<i64>,
5530        row_numbers: Vec<i64>,
5531    }
5532    impl ValuesAndRowNumbers {
5533        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5534            let mut values = vec![];
5535            let mut row_numbers = vec![];
5536            for batch in reader {
5537                let batch = batch.expect("Could not read batch");
5538                values.extend(
5539                    batch
5540                        .column_by_name("col")
5541                        .expect("Could not get col column")
5542                        .as_primitive::<arrow::datatypes::Int64Type>()
5543                        .iter()
5544                        .map(|v| v.expect("Could not get value")),
5545                );
5546
5547                row_numbers.extend(
5548                    batch
5549                        .column_by_name("row_number")
5550                        .expect("Could not get row_number column")
5551                        .as_primitive::<arrow::datatypes::Int64Type>()
5552                        .iter()
5553                        .map(|v| v.expect("Could not get row number"))
5554                        .collect::<Vec<_>>(),
5555                );
5556            }
5557            Self {
5558                values,
5559                row_numbers,
5560            }
5561        }
5562    }
5563
5564    #[test]
5565    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5566        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5567        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5568        assert_eq!(
5569            ArrowReaderOptions::new()
5570                .with_virtual_columns(vec![regular_field])
5571                .unwrap_err()
5572                .to_string(),
5573            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5574        );
5575    }
5576
5577    #[test]
5578    fn test_row_numbers_with_multiple_row_groups() {
5579        test_row_numbers_with_multiple_row_groups_helper(
5580            false,
5581            |path, selection, _row_filter, batch_size| {
5582                let file = File::open(path).unwrap();
5583                let row_number_field = Arc::new(
5584                    Field::new("row_number", ArrowDataType::Int64, false)
5585                        .with_extension_type(RowNumber),
5586                );
5587                let options = ArrowReaderOptions::new()
5588                    .with_virtual_columns(vec![row_number_field])
5589                    .unwrap();
5590                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5591                    .unwrap()
5592                    .with_row_selection(selection)
5593                    .with_batch_size(batch_size)
5594                    .build()
5595                    .expect("Could not create reader");
5596                reader
5597                    .collect::<Result<Vec<_>, _>>()
5598                    .expect("Could not read")
5599            },
5600        );
5601    }
5602
5603    #[test]
5604    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5605        test_row_numbers_with_multiple_row_groups_helper(
5606            true,
5607            |path, selection, row_filter, batch_size| {
5608                let file = File::open(path).unwrap();
5609                let row_number_field = Arc::new(
5610                    Field::new("row_number", ArrowDataType::Int64, false)
5611                        .with_extension_type(RowNumber),
5612                );
5613                let options = ArrowReaderOptions::new()
5614                    .with_virtual_columns(vec![row_number_field])
5615                    .unwrap();
5616                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5617                    .unwrap()
5618                    .with_row_selection(selection)
5619                    .with_batch_size(batch_size)
5620                    .with_row_filter(row_filter.expect("No filter"))
5621                    .build()
5622                    .expect("Could not create reader");
5623                reader
5624                    .collect::<Result<Vec<_>, _>>()
5625                    .expect("Could not read")
5626            },
5627        );
5628    }
5629
5630    #[test]
5631    fn test_read_row_group_indices() {
5632        // create a parquet file with 3 row groups, 2 rows each
5633        let array1 = Int64Array::from(vec![1, 2]);
5634        let array2 = Int64Array::from(vec![3, 4]);
5635        let array3 = Int64Array::from(vec![5, 6]);
5636
5637        let batch1 =
5638            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5639        let batch2 =
5640            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5641        let batch3 =
5642            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5643
5644        let mut buffer = Vec::new();
5645        let options = WriterProperties::builder()
5646            .set_max_row_group_row_count(Some(2))
5647            .build();
5648        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5649        writer.write(&batch1).unwrap();
5650        writer.write(&batch2).unwrap();
5651        writer.write(&batch3).unwrap();
5652        writer.close().unwrap();
5653
5654        let file = Bytes::from(buffer);
5655        let row_group_index_field = Arc::new(
5656            Field::new("row_group_index", ArrowDataType::Int64, false)
5657                .with_extension_type(RowGroupIndex),
5658        );
5659
5660        let options = ArrowReaderOptions::new()
5661            .with_virtual_columns(vec![row_group_index_field.clone()])
5662            .unwrap();
5663        let mut arrow_reader =
5664            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5665                .expect("reader builder with virtual columns")
5666                .build()
5667                .expect("reader with virtual columns");
5668
5669        let batch = arrow_reader.next().unwrap().unwrap();
5670
5671        assert_eq!(batch.num_columns(), 2);
5672        assert_eq!(batch.num_rows(), 6);
5673
5674        assert_eq!(
5675            batch
5676                .column(0)
5677                .as_primitive::<types::Int64Type>()
5678                .iter()
5679                .collect::<Vec<_>>(),
5680            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5681        );
5682
5683        assert_eq!(
5684            batch
5685                .column(1)
5686                .as_primitive::<types::Int64Type>()
5687                .iter()
5688                .collect::<Vec<_>>(),
5689            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5690        );
5691    }
5692
5693    #[test]
5694    fn test_read_only_row_group_indices() {
5695        let array1 = Int64Array::from(vec![1, 2, 3]);
5696        let array2 = Int64Array::from(vec![4, 5]);
5697
5698        let batch1 =
5699            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5700        let batch2 =
5701            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5702
5703        let mut buffer = Vec::new();
5704        let options = WriterProperties::builder()
5705            .set_max_row_group_row_count(Some(3))
5706            .build();
5707        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5708        writer.write(&batch1).unwrap();
5709        writer.write(&batch2).unwrap();
5710        writer.close().unwrap();
5711
5712        let file = Bytes::from(buffer);
5713        let row_group_index_field = Arc::new(
5714            Field::new("row_group_index", ArrowDataType::Int64, false)
5715                .with_extension_type(RowGroupIndex),
5716        );
5717
5718        let options = ArrowReaderOptions::new()
5719            .with_virtual_columns(vec![row_group_index_field.clone()])
5720            .unwrap();
5721        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5722        let num_columns = metadata
5723            .metadata
5724            .file_metadata()
5725            .schema_descr()
5726            .num_columns();
5727
5728        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5729            .with_projection(ProjectionMask::none(num_columns))
5730            .build()
5731            .expect("reader with virtual columns only");
5732
5733        let batch = arrow_reader.next().unwrap().unwrap();
5734        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5735
5736        assert_eq!(batch.schema(), schema);
5737        assert_eq!(batch.num_columns(), 1);
5738        assert_eq!(batch.num_rows(), 5);
5739
5740        assert_eq!(
5741            batch
5742                .column(0)
5743                .as_primitive::<types::Int64Type>()
5744                .iter()
5745                .collect::<Vec<_>>(),
5746            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5747        );
5748    }
5749
5750    #[test]
5751    fn test_read_row_group_indices_with_selection() -> Result<()> {
5752        let mut buffer = Vec::new();
5753        let options = WriterProperties::builder()
5754            .set_max_row_group_row_count(Some(10))
5755            .build();
5756
5757        let schema = Arc::new(Schema::new(vec![Field::new(
5758            "value",
5759            ArrowDataType::Int64,
5760            false,
5761        )]));
5762
5763        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5764
5765        // write out 3 batches of 10 rows each
5766        for i in 0..3 {
5767            let start = i * 10;
5768            let array = Int64Array::from_iter_values(start..start + 10);
5769            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5770            writer.write(&batch)?;
5771        }
5772        writer.close()?;
5773
5774        let file = Bytes::from(buffer);
5775        let row_group_index_field = Arc::new(
5776            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5777        );
5778
5779        let options =
5780            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5781
5782        // test row groups are read in reverse order
5783        let arrow_reader =
5784            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5785                .with_row_groups(vec![2, 1, 0])
5786                .build()?;
5787
5788        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5789        let combined = concat_batches(&batches[0].schema(), &batches)?;
5790
5791        let values = combined.column(0).as_primitive::<types::Int64Type>();
5792        let first_val = values.value(0);
5793        let last_val = values.value(combined.num_rows() - 1);
5794        // first row from rg 2
5795        assert_eq!(first_val, 20);
5796        // the last row from rg 0
5797        assert_eq!(last_val, 9);
5798
5799        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5800        assert_eq!(rg_indices.value(0), 2);
5801        assert_eq!(rg_indices.value(10), 1);
5802        assert_eq!(rg_indices.value(20), 0);
5803
5804        Ok(())
5805    }
5806
5807    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5808        use_filter: bool,
5809        test_case: F,
5810    ) where
5811        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5812    {
5813        let seed: u64 = random();
5814        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5815        let mut rng = StdRng::seed_from_u64(seed);
5816
5817        use tempfile::TempDir;
5818        let tempdir = TempDir::new().expect("Could not create temp dir");
5819
5820        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5821
5822        let path = tempdir.path().join("test.parquet");
5823        std::fs::write(&path, bytes).expect("Could not write file");
5824
5825        let mut case = vec![];
5826        let mut remaining = metadata.file_metadata().num_rows();
5827        while remaining > 0 {
5828            let row_count = rng.random_range(1..=remaining);
5829            remaining -= row_count;
5830            case.push(RowSelector {
5831                row_count: row_count as usize,
5832                skip: rng.random_bool(0.5),
5833            });
5834        }
5835
5836        let filter = use_filter.then(|| {
5837            let filter = (0..metadata.file_metadata().num_rows())
5838                .map(|_| rng.random_bool(0.99))
5839                .collect::<Vec<_>>();
5840            let mut filter_offset = 0;
5841            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5842                ProjectionMask::all(),
5843                move |b| {
5844                    let array = BooleanArray::from_iter(
5845                        filter
5846                            .iter()
5847                            .skip(filter_offset)
5848                            .take(b.num_rows())
5849                            .map(|x| Some(*x)),
5850                    );
5851                    filter_offset += b.num_rows();
5852                    Ok(array)
5853                },
5854            ))])
5855        });
5856
5857        let selection = RowSelection::from(case);
5858        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5859
5860        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5861            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5862            return;
5863        }
5864        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5865            .expect("Failed to concatenate");
5866        // assert_eq!(selection.row_count(), actual.num_rows());
5867        let values = actual
5868            .column(0)
5869            .as_primitive::<types::Int64Type>()
5870            .iter()
5871            .collect::<Vec<_>>();
5872        let row_numbers = actual
5873            .column(1)
5874            .as_primitive::<types::Int64Type>()
5875            .iter()
5876            .collect::<Vec<_>>();
5877        assert_eq!(
5878            row_numbers
5879                .into_iter()
5880                .map(|number| number.map(|number| number + 1))
5881                .collect::<Vec<_>>(),
5882            values
5883        );
5884    }
5885
5886    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5887        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5888            "value",
5889            ArrowDataType::Int64,
5890            false,
5891        )])));
5892
5893        let mut buf = Vec::with_capacity(1024);
5894        let mut writer =
5895            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5896
5897        let mut values = 1..=rng.random_range(1..4096);
5898        while !values.is_empty() {
5899            let batch_values = values
5900                .by_ref()
5901                .take(rng.random_range(1..4096))
5902                .collect::<Vec<_>>();
5903            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5904            let batch =
5905                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5906            writer.write(&batch).expect("Could not write batch");
5907            writer.flush().expect("Could not flush");
5908        }
5909        let metadata = writer.close().expect("Could not close writer");
5910
5911        (Bytes::from(buf), metadata)
5912    }
5913}