Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    ///
308    /// See the [blog post on late materialization] for a more technical explanation.
309    ///
310    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
311    ///
312    /// # Example
313    /// ```rust
314    /// # use std::fs::File;
315    /// # use arrow_array::Int32Array;
316    /// # use parquet::arrow::ProjectionMask;
317    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
318    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
319    /// # let testdata = arrow::util::test_util::parquet_test_data();
320    /// # let path = format!("{testdata}/alltypes_plain.parquet");
321    /// # let file = File::open(&path)?;
322    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
323    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
324    /// // Create predicate that evaluates `int_col != 1`.
325    /// // `int_col` column has index 4 (zero based) in the schema
326    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
327    /// // Only the projection columns are passed to the predicate so
328    /// // int_col is column 0 in the predicate
329    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
330    ///     let int_col = batch.column(0);
331    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
332    /// });
333    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
334    /// // The filter will be invoked during the reading process
335    /// let reader = builder.with_row_filter(row_filter).build()?;
336    /// # for b in reader { let _ = b?; }
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn with_row_filter(self, filter: RowFilter) -> Self {
341        Self {
342            filter: Some(filter),
343            ..self
344        }
345    }
346
347    /// Provide a limit to the number of rows to be read
348    ///
349    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
350    /// allowing it to limit the final set of rows decoded after any pushed down predicates
351    ///
352    /// It is recommended to enable reading the page index if using this functionality, to allow
353    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
354    pub fn with_limit(self, limit: usize) -> Self {
355        Self {
356            limit: Some(limit),
357            ..self
358        }
359    }
360
361    /// Provide an offset to skip over the given number of rows
362    ///
363    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
364    /// allowing it to skip rows after any pushed down predicates
365    ///
366    /// It is recommended to enable reading the page index if using this functionality, to allow
367    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
368    pub fn with_offset(self, offset: usize) -> Self {
369        Self {
370            offset: Some(offset),
371            ..self
372        }
373    }
374
375    /// Specify metrics collection during reading
376    ///
377    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
378    /// clone of the provided metrics to the builder.
379    ///
380    /// For example:
381    ///
382    /// ```rust
383    /// # use std::sync::Arc;
384    /// # use bytes::Bytes;
385    /// # use arrow_array::{Int32Array, RecordBatch};
386    /// # use arrow_schema::{DataType, Field, Schema};
387    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
388    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
389    /// # use parquet::arrow::ArrowWriter;
390    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// # let file = Bytes::from(file);
397    /// // Create metrics object to pass into the reader
398    /// let metrics = ArrowReaderMetrics::enabled();
399    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
400    ///   // Configure the builder to use the metrics by passing a clone
401    ///   .with_metrics(metrics.clone())
402    ///   // Build the reader
403    ///   .build().unwrap();
404    /// // .. read data from the reader ..
405    ///
406    /// // check the metrics
407    /// assert!(metrics.records_read_from_inner().is_some());
408    /// ```
409    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410        Self { metrics, ..self }
411    }
412
413    /// Set the maximum size (per row group) of the predicate cache in bytes for
414    /// the async decoder.
415    ///
416    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
417    /// unlimited cache size.
418    ///
419    /// This cache is used to store decoded arrays that are used in
420    /// predicate evaluation ([`Self::with_row_filter`]).
421    ///
422    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
423    /// [this ticket] for more details and alternatives.
424    ///
425    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
426    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
427    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428        Self {
429            max_predicate_cache_size,
430            ..self
431        }
432    }
433}
434
435/// Options that control how [`ParquetMetaData`] is read when constructing
436/// an Arrow reader.
437///
438/// To use these options, pass them to one of the following methods:
439/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
440/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
441///
442/// For fine-grained control over metadata loading, use
443/// [`ArrowReaderMetadata::load`] to load metadata with these options,
444///
445/// See [`ArrowReaderBuilder`] for how to configure how the column data
446/// is then read from the file, including projection and filter pushdown
447///
448/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
449#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451    /// Should the reader strip any user defined metadata from the Arrow schema
452    skip_arrow_metadata: bool,
453    /// If provided, used as the schema hint when determining the Arrow schema,
454    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
455    ///
456    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
457    supplied_schema: Option<SchemaRef>,
458
459    pub(crate) column_index: PageIndexPolicy,
460    pub(crate) offset_index: PageIndexPolicy,
461
462    /// Options to control reading of Parquet metadata
463    metadata_options: ParquetMetaDataOptions,
464    /// If encryption is enabled, the file decryption properties can be provided
465    #[cfg(feature = "encryption")]
466    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468    virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472    /// Create a new [`ArrowReaderOptions`] with the default settings
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Skip decoding the embedded arrow metadata (defaults to `false`)
478    ///
479    /// Parquet files generated by some writers may contain embedded arrow
480    /// schema and metadata.
481    /// This may not be correct or compatible with your system,
482    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
483    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484        Self {
485            skip_arrow_metadata,
486            ..self
487        }
488    }
489
490    /// Provide a schema hint to use when reading the Parquet file.
491    ///
492    /// If provided, this schema takes precedence over any arrow schema embedded
493    /// in the metadata (see the [`arrow`] documentation for more details).
494    ///
495    /// If the provided schema is not compatible with the data stored in the
496    /// parquet file schema, an error will be returned when constructing the
497    /// builder.
498    ///
499    /// This option is only required if you want to explicitly control the
500    /// conversion of Parquet types to Arrow types, such as casting a column to
501    /// a different type. For example, if you wanted to read an Int64 in
502    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
503    ///
504    /// [`arrow`]: crate::arrow
505    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
506    ///
507    /// # Notes
508    ///
509    /// The provided schema must have the same number of columns as the parquet schema and
510    /// the column names must be the same.
511    ///
512    /// # Example
513    /// ```
514    /// # use std::sync::Arc;
515    /// # use bytes::Bytes;
516    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
517    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
518    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
519    /// # use parquet::arrow::ArrowWriter;
520    /// // Write data - schema is inferred from the data to be Int32
521    /// let mut file = Vec::new();
522    /// let batch = RecordBatch::try_from_iter(vec![
523    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
524    /// ]).unwrap();
525    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
526    /// writer.write(&batch).unwrap();
527    /// writer.close().unwrap();
528    /// let file = Bytes::from(file);
529    ///
530    /// // Read the file back.
531    /// // Supply a schema that interprets the Int32 column as a Timestamp.
532    /// let supplied_schema = Arc::new(Schema::new(vec![
533    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
534    /// ]));
535    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
536    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
537    ///     file.clone(),
538    ///     options
539    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
540    ///
541    /// // Create the reader and read the data using the supplied schema.
542    /// let mut reader = builder.build().unwrap();
543    /// let _batch = reader.next().unwrap().unwrap();
544    /// ```
545    ///
546    /// # Example: Preserving Dictionary Encoding
547    ///
548    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
549    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
550    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
551    ///
552    /// ```
553    /// # use std::sync::Arc;
554    /// # use bytes::Bytes;
555    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
556    /// # use arrow_schema::{DataType, Field, Schema};
557    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
558    /// # use parquet::arrow::ArrowWriter;
559    /// // Write a Parquet file with string data
560    /// let mut file = Vec::new();
561    /// let schema = Arc::new(Schema::new(vec![
562    ///     Field::new("city", DataType::Utf8, false)
563    /// ]));
564    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
565    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
566    ///
567    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
568    /// writer.write(&batch).unwrap();
569    /// writer.close().unwrap();
570    /// let file = Bytes::from(file);
571    ///
572    /// // Read the file back, requesting dictionary encoding preservation
573    /// let dict_schema = Arc::new(Schema::new(vec![
574    ///     Field::new("city", DataType::Dictionary(
575    ///         Box::new(DataType::Int32),
576    ///         Box::new(DataType::Utf8)
577    ///     ), false)
578    /// ]));
579    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
580    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
581    ///     file.clone(),
582    ///     options
583    /// ).unwrap();
584    ///
585    /// let mut reader = builder.build().unwrap();
586    /// let batch = reader.next().unwrap().unwrap();
587    ///
588    /// // The column is now a DictionaryArray
589    /// assert!(matches!(
590    ///     batch.column(0).data_type(),
591    ///     DataType::Dictionary(_, _)
592    /// ));
593    /// ```
594    ///
595    /// **Note**: Dictionary encoding preservation works best when:
596    /// 1. The original column was dictionary encoded (the default for string columns)
597    /// 2. There are a small number of distinct values
598    pub fn with_schema(self, schema: SchemaRef) -> Self {
599        Self {
600            supplied_schema: Some(schema),
601            skip_arrow_metadata: true,
602            ..self
603        }
604    }
605
606    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
608    ///
609    /// The `PageIndex` can be used to push down predicates to the parquet scan,
610    /// potentially eliminating unnecessary IO, by some query engines.
611    ///
612    /// If this is enabled, [`ParquetMetaData::column_index`] and
613    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
614    /// information is present in the file.
615    ///
616    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
617    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
618    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
619    pub fn with_page_index(self, page_index: bool) -> Self {
620        self.with_page_index_policy(PageIndexPolicy::from(page_index))
621    }
622
623    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
624    ///
625    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
626    /// This method sets the same policy for both. For fine-grained control, use
627    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
628    ///
629    /// See [`Self::with_page_index`] for more details on page indexes.
630    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631        self.with_column_index_policy(policy)
632            .with_offset_index_policy(policy)
633    }
634
635    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
636    ///
637    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
638    /// for predicate pushdown and page-level pruning.
639    ///
640    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
641    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642        self.column_index = policy;
643        self
644    }
645
646    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
647    ///
648    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
649    /// efficient page-level skipping and random access within column chunks.
650    ///
651    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
652    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653        self.offset_index = policy;
654        self
655    }
656
657    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
658    /// footer will be skipped.
659    ///
660    /// This can be used to avoid reparsing the schema from the file when it is
661    /// already known.
662    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663        self.metadata_options.set_schema(schema);
664        self
665    }
666
667    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
668    /// (defaults to `false`).
669    ///
670    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
671    /// might be desirable.
672    ///
673    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
674    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
675    /// [`encoding_stats`]:
676    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
677    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678        self.metadata_options.set_encoding_stats_as_mask(val);
679        self
680    }
681
682    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
683    ///
684    /// [`encoding_stats`]:
685    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
686    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687        self.metadata_options.set_encoding_stats_policy(policy);
688        self
689    }
690
691    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
692    ///
693    /// [`statistics`]:
694    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
695    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696        self.metadata_options.set_column_stats_policy(policy);
697        self
698    }
699
700    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
701    ///
702    /// [`size_statistics`]:
703    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
704    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705        self.metadata_options.set_size_stats_policy(policy);
706        self
707    }
708
709    /// Provide the file decryption properties to use when reading encrypted parquet files.
710    ///
711    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
712    #[cfg(feature = "encryption")]
713    pub fn with_file_decryption_properties(
714        self,
715        file_decryption_properties: Arc<FileDecryptionProperties>,
716    ) -> Self {
717        Self {
718            file_decryption_properties: Some(file_decryption_properties),
719            ..self
720        }
721    }
722
723    /// Include virtual columns in the output.
724    ///
725    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
726    ///
727    /// # Example
728    /// ```
729    /// # use std::sync::Arc;
730    /// # use bytes::Bytes;
731    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
732    /// # use arrow_schema::{DataType, Field, Schema};
733    /// # use parquet::arrow::{ArrowWriter, RowNumber};
734    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
735    /// #
736    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
737    /// // Create a simple record batch with some data
738    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
739    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
740    ///
741    /// // Write the batch to an in-memory buffer
742    /// let mut file = Vec::new();
743    /// let mut writer = ArrowWriter::try_new(
744    ///     &mut file,
745    ///     batch.schema(),
746    ///     None
747    /// )?;
748    /// writer.write(&batch)?;
749    /// writer.close()?;
750    /// let file = Bytes::from(file);
751    ///
752    /// // Create a virtual column for row numbers
753    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
754    ///     .with_extension_type(RowNumber));
755    ///
756    /// // Configure options with virtual columns
757    /// let options = ArrowReaderOptions::new()
758    ///     .with_virtual_columns(vec![row_number_field])?;
759    ///
760    /// // Create a reader with the options
761    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
762    ///     file,
763    ///     options
764    /// )?
765    /// .build()?;
766    ///
767    /// // Read the batch - it will include both the original column and the virtual row_number column
768    /// let result_batch = reader.next().unwrap()?;
769    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
770    /// assert_eq!(result_batch.num_rows(), 3);
771    /// #
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776        // Validate that all fields are virtual columns
777        for field in &virtual_columns {
778            if !is_virtual_column(field) {
779                return Err(ParquetError::General(format!(
780                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781                    field.name()
782                )));
783            }
784        }
785        Ok(Self {
786            virtual_columns,
787            ..self
788        })
789    }
790
791    #[deprecated(
792        since = "57.2.0",
793        note = "Use `column_index_policy` or `offset_index_policy` instead"
794    )]
795    /// Returns whether page index reading is enabled.
796    ///
797    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
798    ///
799    /// This can be set via [`with_page_index`][Self::with_page_index] or
800    /// [`with_page_index_policy`][Self::with_page_index_policy].
801    pub fn page_index(&self) -> bool {
802        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803    }
804
805    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
806    ///
807    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
808    /// or [`with_page_index_policy`][Self::with_page_index_policy].
809    pub fn offset_index_policy(&self) -> PageIndexPolicy {
810        self.offset_index
811    }
812
813    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
814    ///
815    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
816    /// or [`with_page_index_policy`][Self::with_page_index_policy].
817    pub fn column_index_policy(&self) -> PageIndexPolicy {
818        self.column_index
819    }
820
821    /// Retrieve the currently set metadata decoding options.
822    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823        &self.metadata_options
824    }
825
826    /// Retrieve the currently set file decryption properties.
827    ///
828    /// This can be set via
829    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
830    #[cfg(feature = "encryption")]
831    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832        self.file_decryption_properties.as_ref()
833    }
834}
835
836/// The metadata necessary to construct a [`ArrowReaderBuilder`]
837///
838/// Note this structure is cheaply clone-able as it consists of several arcs.
839///
840/// This structure allows
841///
842/// 1. Loading metadata for a file once and then using that same metadata to
843///    construct multiple separate readers, for example, to distribute readers
844///    across multiple threads
845///
846/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
847///    from the file each time a reader is constructed.
848///
849/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
850#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852    /// The Parquet Metadata, if known aprior
853    pub(crate) metadata: Arc<ParquetMetaData>,
854    /// The Arrow Schema
855    pub(crate) schema: SchemaRef,
856    /// The Parquet schema (root field)
857    pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
862    /// and [`ChunkReader`]
863    ///
864    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
865    /// example of how this can be used
866    ///
867    /// # Notes
868    ///
869    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
870    /// `Self::metadata` is missing the page index, this function will attempt
871    /// to load the page index by making an object store request.
872    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873        let metadata = ParquetMetaDataReader::new()
874            .with_column_index_policy(options.column_index)
875            .with_offset_index_policy(options.offset_index)
876            .with_metadata_options(Some(options.metadata_options.clone()));
877        #[cfg(feature = "encryption")]
878        let metadata = metadata.with_decryption_properties(
879            options.file_decryption_properties.as_ref().map(Arc::clone),
880        );
881        let metadata = metadata.parse_and_finish(reader)?;
882        Self::try_new(Arc::new(metadata), options)
883    }
884
885    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
886    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
887    ///
888    /// # Notes
889    ///
890    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
891    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
892    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893        match options.supplied_schema {
894            Some(supplied_schema) => Self::with_supplied_schema(
895                metadata,
896                supplied_schema.clone(),
897                &options.virtual_columns,
898            ),
899            None => {
900                let kv_metadata = match options.skip_arrow_metadata {
901                    true => None,
902                    false => metadata.file_metadata().key_value_metadata(),
903                };
904
905                let (schema, fields) = parquet_to_arrow_schema_and_fields(
906                    metadata.file_metadata().schema_descr(),
907                    ProjectionMask::all(),
908                    kv_metadata,
909                    &options.virtual_columns,
910                )?;
911
912                Ok(Self {
913                    metadata,
914                    schema: Arc::new(schema),
915                    fields: fields.map(Arc::new),
916                })
917            }
918        }
919    }
920
921    fn with_supplied_schema(
922        metadata: Arc<ParquetMetaData>,
923        supplied_schema: SchemaRef,
924        virtual_columns: &[FieldRef],
925    ) -> Result<Self> {
926        let parquet_schema = metadata.file_metadata().schema_descr();
927        let field_levels = parquet_to_arrow_field_levels_with_virtual(
928            parquet_schema,
929            ProjectionMask::all(),
930            Some(supplied_schema.fields()),
931            virtual_columns,
932        )?;
933        let fields = field_levels.fields;
934        let inferred_len = fields.len();
935        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936        // Ensure the supplied schema has the same number of columns as the parquet schema.
937        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
938        // different lengths, but we check here to be safe.
939        if inferred_len != supplied_len {
940            return Err(arrow_err!(format!(
941                "Incompatible supplied Arrow schema: expected {} columns received {}",
942                inferred_len, supplied_len
943            )));
944        }
945
946        let mut errors = Vec::new();
947
948        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950        for (field1, field2) in field_iter {
951            if field1.data_type() != field2.data_type() {
952                errors.push(format!(
953                    "data type mismatch for field {}: requested {} but found {}",
954                    field1.name(),
955                    field1.data_type(),
956                    field2.data_type()
957                ));
958            }
959            if field1.is_nullable() != field2.is_nullable() {
960                errors.push(format!(
961                    "nullability mismatch for field {}: expected {:?} but found {:?}",
962                    field1.name(),
963                    field1.is_nullable(),
964                    field2.is_nullable()
965                ));
966            }
967            if field1.metadata() != field2.metadata() {
968                errors.push(format!(
969                    "metadata mismatch for field {}: expected {:?} but found {:?}",
970                    field1.name(),
971                    field1.metadata(),
972                    field2.metadata()
973                ));
974            }
975        }
976
977        if !errors.is_empty() {
978            let message = errors.join(", ");
979            return Err(ParquetError::ArrowError(format!(
980                "Incompatible supplied Arrow schema: {message}",
981            )));
982        }
983
984        Ok(Self {
985            metadata,
986            schema: supplied_schema,
987            fields: field_levels.levels.map(Arc::new),
988        })
989    }
990
991    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
992    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993        &self.metadata
994    }
995
996    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
997    pub fn parquet_schema(&self) -> &SchemaDescriptor {
998        self.metadata.file_metadata().schema_descr()
999    }
1000
1001    /// Returns the arrow [`SchemaRef`] for this parquet file
1002    pub fn schema(&self) -> &SchemaRef {
1003        &self.schema
1004    }
1005}
1006
1007#[doc(hidden)]
1008// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1009pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013        f.debug_tuple("SyncReader").field(&self.0).finish()
1014    }
1015}
1016
1017/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1018///
1019/// # See Also
1020/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1021/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1022/// * [`ArrowReaderBuilder`] for additional member functions
1023pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1027    ///
1028    /// ```
1029    /// # use std::sync::Arc;
1030    /// # use bytes::Bytes;
1031    /// # use arrow_array::{Int32Array, RecordBatch};
1032    /// # use arrow_schema::{DataType, Field, Schema};
1033    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1034    /// # use parquet::arrow::ArrowWriter;
1035    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1036    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1037    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1038    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1039    /// # writer.write(&batch).unwrap();
1040    /// # writer.close().unwrap();
1041    /// # let file = Bytes::from(file);
1042    /// // Build the reader from anything that implements `ChunkReader`
1043    /// // such as a `File`, or `Bytes`
1044    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045    /// // The builder has access to ParquetMetaData such
1046    /// // as the number and layout of row groups
1047    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1048    /// // Call build to create the reader
1049    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1050    /// // Read data
1051    /// while let Some(batch) = reader.next().transpose()? {
1052    ///     println!("Read {} rows", batch.num_rows());
1053    /// }
1054    /// # Ok::<(), parquet::errors::ParquetError>(())
1055    /// ```
1056    pub fn try_new(reader: T) -> Result<Self> {
1057        Self::try_new_with_options(reader, Default::default())
1058    }
1059
1060    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1061    ///
1062    /// Use this method if you want to control the options for reading the
1063    /// [`ParquetMetaData`]
1064    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066        Ok(Self::new_with_metadata(reader, metadata))
1067    }
1068
1069    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1070    ///
1071    /// Use this method if you already have [`ParquetMetaData`] for a file.
1072    /// This interface allows:
1073    ///
1074    /// 1. Loading metadata once and using it to create multiple builders with
1075    ///    potentially different settings or run on different threads
1076    ///
1077    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1078    ///    file each time a reader is constructed.
1079    ///
1080    /// See the docs on [`ArrowReaderMetadata`] for more details
1081    ///
1082    /// # Example
1083    /// ```
1084    /// # use std::fs::metadata;
1085    /// # use std::sync::Arc;
1086    /// # use bytes::Bytes;
1087    /// # use arrow_array::{Int32Array, RecordBatch};
1088    /// # use arrow_schema::{DataType, Field, Schema};
1089    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1090    /// # use parquet::arrow::ArrowWriter;
1091    /// #
1092    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1093    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1094    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1095    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1096    /// # writer.write(&batch).unwrap();
1097    /// # writer.close().unwrap();
1098    /// # let file = Bytes::from(file);
1099    /// #
1100    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1101    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1102    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1103    ///
1104    /// // Should be able to read from both in parallel
1105    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1106    /// ```
1107    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108        Self::new_builder(SyncReader(input), metadata)
1109    }
1110
1111    /// Read bloom filter for a column in a row group
1112    ///
1113    /// Returns `None` if the column does not have a bloom filter
1114    ///
1115    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1116    pub fn get_row_group_column_bloom_filter(
1117        &self,
1118        row_group_idx: usize,
1119        column_idx: usize,
1120    ) -> Result<Option<Sbbf>> {
1121        let metadata = self.metadata.row_group(row_group_idx);
1122        let column_metadata = metadata.column(column_idx);
1123
1124        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125            offset
1126                .try_into()
1127                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128        } else {
1129            return Ok(None);
1130        };
1131
1132        let buffer = match column_metadata.bloom_filter_length() {
1133            Some(length) => self.input.0.get_bytes(offset, length as usize),
1134            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135        }?;
1136
1137        let (header, bitset_offset) =
1138            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140        match header.algorithm {
1141            BloomFilterAlgorithm::BLOCK => {
1142                // this match exists to future proof the singleton algorithm enum
1143            }
1144        }
1145        match header.compression {
1146            BloomFilterCompression::UNCOMPRESSED => {
1147                // this match exists to future proof the singleton compression enum
1148            }
1149        }
1150        match header.hash {
1151            BloomFilterHash::XXHASH => {
1152                // this match exists to future proof the singleton hash enum
1153            }
1154        }
1155
1156        let bitset = match column_metadata.bloom_filter_length() {
1157            Some(_) => buffer.slice(
1158                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159                    - TryInto::<usize>::try_into(offset).unwrap())..,
1160            ),
1161            None => {
1162                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163                    ParquetError::General("Bloom filter length is invalid".to_string())
1164                })?;
1165                self.input.0.get_bytes(bitset_offset, bitset_length)?
1166            }
1167        };
1168        Ok(Some(Sbbf::new(&bitset)))
1169    }
1170
1171    /// Build a [`ParquetRecordBatchReader`]
1172    ///
1173    /// Note: this will eagerly evaluate any `RowFilter` before returning
1174    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175        let Self {
1176            input,
1177            metadata,
1178            schema: _,
1179            fields,
1180            batch_size,
1181            row_groups,
1182            projection,
1183            mut filter,
1184            selection,
1185            row_selection_policy,
1186            limit,
1187            offset,
1188            metrics,
1189            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1190            max_predicate_cache_size: _,
1191        } = self;
1192
1193        // Try to avoid allocate large buffer
1194        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198        let reader = ReaderRowGroups {
1199            reader: Arc::new(input.0),
1200            metadata,
1201            row_groups,
1202        };
1203
1204        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205            .with_selection(selection)
1206            .with_row_selection_policy(row_selection_policy);
1207
1208        // Update selection based on any filters
1209        if let Some(filter) = filter.as_mut() {
1210            for predicate in filter.predicates.iter_mut() {
1211                // break early if we have ruled out all rows
1212                if !plan_builder.selects_any() {
1213                    break;
1214                }
1215
1216                let mut cache_projection = predicate.projection().clone();
1217                cache_projection.intersect(&projection);
1218
1219                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1220                    .with_parquet_metadata(&reader.metadata)
1221                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1222
1223                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1224            }
1225        }
1226
1227        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1228            .with_parquet_metadata(&reader.metadata)
1229            .build_array_reader(fields.as_deref(), &projection)?;
1230
1231        let read_plan = plan_builder
1232            .limited(reader.num_rows())
1233            .with_offset(offset)
1234            .with_limit(limit)
1235            .build_limited()
1236            .build();
1237
1238        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1239    }
1240}
1241
1242struct ReaderRowGroups<T: ChunkReader> {
1243    reader: Arc<T>,
1244
1245    metadata: Arc<ParquetMetaData>,
1246    /// Optional list of row group indices to scan
1247    row_groups: Vec<usize>,
1248}
1249
1250impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1251    fn num_rows(&self) -> usize {
1252        let meta = self.metadata.row_groups();
1253        self.row_groups
1254            .iter()
1255            .map(|x| meta[*x].num_rows() as usize)
1256            .sum()
1257    }
1258
1259    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1260        Ok(Box::new(ReaderPageIterator {
1261            column_idx: i,
1262            reader: self.reader.clone(),
1263            metadata: self.metadata.clone(),
1264            row_groups: self.row_groups.clone().into_iter(),
1265        }))
1266    }
1267
1268    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1269        Box::new(
1270            self.row_groups
1271                .iter()
1272                .map(move |i| self.metadata.row_group(*i)),
1273        )
1274    }
1275
1276    fn metadata(&self) -> &ParquetMetaData {
1277        self.metadata.as_ref()
1278    }
1279}
1280
1281struct ReaderPageIterator<T: ChunkReader> {
1282    reader: Arc<T>,
1283    column_idx: usize,
1284    row_groups: std::vec::IntoIter<usize>,
1285    metadata: Arc<ParquetMetaData>,
1286}
1287
1288impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1289    /// Return the next SerializedPageReader
1290    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1291        let rg = self.metadata.row_group(rg_idx);
1292        let column_chunk_metadata = rg.column(self.column_idx);
1293        let offset_index = self.metadata.offset_index();
1294        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1295        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1296        let page_locations = offset_index
1297            .filter(|i| !i[rg_idx].is_empty())
1298            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1299        let total_rows = rg.num_rows() as usize;
1300        let reader = self.reader.clone();
1301
1302        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1303            .add_crypto_context(
1304                rg_idx,
1305                self.column_idx,
1306                self.metadata.as_ref(),
1307                column_chunk_metadata,
1308            )
1309    }
1310}
1311
1312impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1313    type Item = Result<Box<dyn PageReader>>;
1314
1315    fn next(&mut self) -> Option<Self::Item> {
1316        let rg_idx = self.row_groups.next()?;
1317        let page_reader = self
1318            .next_page_reader(rg_idx)
1319            .map(|page_reader| Box::new(page_reader) as _);
1320        Some(page_reader)
1321    }
1322}
1323
1324impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1325
1326/// Reads Parquet data as Arrow [`RecordBatch`]es
1327///
1328/// This struct implements the [`RecordBatchReader`] trait and is an
1329/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1330///
1331/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1332///
1333/// Created by [`ParquetRecordBatchReaderBuilder`]
1334///
1335/// [`Bytes`]: bytes::Bytes
1336pub struct ParquetRecordBatchReader {
1337    array_reader: Box<dyn ArrayReader>,
1338    schema: SchemaRef,
1339    read_plan: ReadPlan,
1340}
1341
1342impl Debug for ParquetRecordBatchReader {
1343    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1344        f.debug_struct("ParquetRecordBatchReader")
1345            .field("array_reader", &"...")
1346            .field("schema", &self.schema)
1347            .field("read_plan", &self.read_plan)
1348            .finish()
1349    }
1350}
1351
1352impl Iterator for ParquetRecordBatchReader {
1353    type Item = Result<RecordBatch, ArrowError>;
1354
1355    fn next(&mut self) -> Option<Self::Item> {
1356        self.next_inner()
1357            .map_err(|arrow_err| arrow_err.into())
1358            .transpose()
1359    }
1360}
1361
1362impl ParquetRecordBatchReader {
1363    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1364    /// has reached the end of the file.
1365    ///
1366    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1367    /// simplify error handling with `?`
1368    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1369        let mut read_records = 0;
1370        let batch_size = self.batch_size();
1371        if batch_size == 0 {
1372            return Ok(None);
1373        }
1374        match self.read_plan.row_selection_cursor_mut() {
1375            RowSelectionCursor::Mask(mask_cursor) => {
1376                // Stream the record batch reader using contiguous segments of the selection
1377                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1378                while !mask_cursor.is_empty() {
1379                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1380                        return Ok(None);
1381                    };
1382
1383                    if mask_chunk.initial_skip > 0 {
1384                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1385                        if skipped != mask_chunk.initial_skip {
1386                            return Err(general_err!(
1387                                "failed to skip rows, expected {}, got {}",
1388                                mask_chunk.initial_skip,
1389                                skipped
1390                            ));
1391                        }
1392                    }
1393
1394                    if mask_chunk.chunk_rows == 0 {
1395                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1396                            return Ok(None);
1397                        }
1398                        continue;
1399                    }
1400
1401                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1402
1403                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1404                    if read == 0 {
1405                        return Err(general_err!(
1406                            "reached end of column while expecting {} rows",
1407                            mask_chunk.chunk_rows
1408                        ));
1409                    }
1410                    if read != mask_chunk.chunk_rows {
1411                        return Err(general_err!(
1412                            "insufficient rows read from array reader - expected {}, got {}",
1413                            mask_chunk.chunk_rows,
1414                            read
1415                        ));
1416                    }
1417
1418                    let array = self.array_reader.consume_batch()?;
1419                    // The column reader exposes the projection as a struct array; convert this
1420                    // into a record batch before applying the boolean filter mask.
1421                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1422                        ArrowError::ParquetError(
1423                            "Struct array reader should return struct array".to_string(),
1424                        )
1425                    })?;
1426
1427                    let filtered_batch =
1428                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1429
1430                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1431                        return Err(general_err!(
1432                            "filtered rows mismatch selection - expected {}, got {}",
1433                            mask_chunk.selected_rows,
1434                            filtered_batch.num_rows()
1435                        ));
1436                    }
1437
1438                    if filtered_batch.num_rows() == 0 {
1439                        continue;
1440                    }
1441
1442                    return Ok(Some(filtered_batch));
1443                }
1444            }
1445            RowSelectionCursor::Selectors(selectors_cursor) => {
1446                while read_records < batch_size && !selectors_cursor.is_empty() {
1447                    let front = selectors_cursor.next_selector();
1448                    if front.skip {
1449                        let skipped = self.array_reader.skip_records(front.row_count)?;
1450
1451                        if skipped != front.row_count {
1452                            return Err(general_err!(
1453                                "failed to skip rows, expected {}, got {}",
1454                                front.row_count,
1455                                skipped
1456                            ));
1457                        }
1458                        continue;
1459                    }
1460
1461                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1462                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1463                    if front.row_count == 0 {
1464                        continue;
1465                    }
1466
1467                    // try to read record
1468                    let need_read = batch_size - read_records;
1469                    let to_read = match front.row_count.checked_sub(need_read) {
1470                        Some(remaining) if remaining != 0 => {
1471                            // if page row count less than batch_size we must set batch size to page row count.
1472                            // add check avoid dead loop
1473                            selectors_cursor.return_selector(RowSelector::select(remaining));
1474                            need_read
1475                        }
1476                        _ => front.row_count,
1477                    };
1478                    match self.array_reader.read_records(to_read)? {
1479                        0 => break,
1480                        rec => read_records += rec,
1481                    };
1482                }
1483            }
1484            RowSelectionCursor::All => {
1485                self.array_reader.read_records(batch_size)?;
1486            }
1487        };
1488
1489        let array = self.array_reader.consume_batch()?;
1490        let struct_array = array.as_struct_opt().ok_or_else(|| {
1491            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1492        })?;
1493
1494        Ok(if struct_array.len() > 0 {
1495            Some(RecordBatch::from(struct_array))
1496        } else {
1497            None
1498        })
1499    }
1500}
1501
1502impl RecordBatchReader for ParquetRecordBatchReader {
1503    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1504    ///
1505    /// Note that the schema metadata will be stripped here. See
1506    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1507    fn schema(&self) -> SchemaRef {
1508        self.schema.clone()
1509    }
1510}
1511
1512impl ParquetRecordBatchReader {
1513    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1514    ///
1515    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1516    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1517        ParquetRecordBatchReaderBuilder::try_new(reader)?
1518            .with_batch_size(batch_size)
1519            .build()
1520    }
1521
1522    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1523    ///
1524    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1525    /// higher-level interface for reading parquet data from a file
1526    pub fn try_new_with_row_groups(
1527        levels: &FieldLevels,
1528        row_groups: &dyn RowGroups,
1529        batch_size: usize,
1530        selection: Option<RowSelection>,
1531    ) -> Result<Self> {
1532        // note metrics are not supported in this API
1533        let metrics = ArrowReaderMetrics::disabled();
1534        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1535            .with_parquet_metadata(row_groups.metadata())
1536            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1537
1538        let read_plan = ReadPlanBuilder::new(batch_size)
1539            .with_selection(selection)
1540            .build();
1541
1542        Ok(Self {
1543            array_reader,
1544            schema: Arc::new(Schema::new(levels.fields.clone())),
1545            read_plan,
1546        })
1547    }
1548
1549    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1550    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1551    /// all rows will be returned
1552    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1553        let schema = match array_reader.get_data_type() {
1554            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1555            _ => unreachable!("Struct array reader's data type is not struct!"),
1556        };
1557
1558        Self {
1559            array_reader,
1560            schema: Arc::new(schema),
1561            read_plan,
1562        }
1563    }
1564
1565    #[inline(always)]
1566    pub(crate) fn batch_size(&self) -> usize {
1567        self.read_plan.batch_size()
1568    }
1569}
1570
1571#[cfg(test)]
1572pub(crate) mod tests {
1573    use std::cmp::min;
1574    use std::collections::{HashMap, VecDeque};
1575    use std::fmt::Formatter;
1576    use std::fs::File;
1577    use std::io::Seek;
1578    use std::path::PathBuf;
1579    use std::sync::Arc;
1580
1581    use rand::rngs::StdRng;
1582    use rand::{Rng, RngCore, SeedableRng, random, rng};
1583    use tempfile::tempfile;
1584
1585    use crate::arrow::arrow_reader::{
1586        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1587        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1588        RowSelectionPolicy, RowSelector,
1589    };
1590    use crate::arrow::schema::{
1591        add_encoded_arrow_schema_to_metadata,
1592        virtual_type::{RowGroupIndex, RowNumber},
1593    };
1594    use crate::arrow::{ArrowWriter, ProjectionMask};
1595    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1596    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1597    use crate::data_type::{
1598        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1599        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1600    };
1601    use crate::errors::Result;
1602    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1603    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1604    use crate::file::writer::SerializedFileWriter;
1605    use crate::schema::parser::parse_message_type;
1606    use crate::schema::types::{Type, TypePtr};
1607    use crate::util::test_common::rand_gen::RandGen;
1608    use arrow::compute::kernels::cmp::eq;
1609    use arrow::compute::or;
1610    use arrow_array::builder::*;
1611    use arrow_array::cast::AsArray;
1612    use arrow_array::types::{
1613        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1614        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1615        Time64MicrosecondType,
1616    };
1617    use arrow_array::*;
1618    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1619    use arrow_data::{ArrayData, ArrayDataBuilder};
1620    use arrow_schema::{
1621        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1622    };
1623    use arrow_select::concat::concat_batches;
1624    use bytes::Bytes;
1625    use half::f16;
1626    use num_traits::PrimInt;
1627
1628    #[test]
1629    fn test_arrow_reader_all_columns() {
1630        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1631
1632        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1633        let original_schema = Arc::clone(builder.schema());
1634        let reader = builder.build().unwrap();
1635
1636        // Verify that the schema was correctly parsed
1637        assert_eq!(original_schema.fields(), reader.schema().fields());
1638    }
1639
1640    #[test]
1641    fn test_reuse_schema() {
1642        let file = get_test_file("parquet/alltypes-java.parquet");
1643
1644        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1645        let expected = builder.metadata;
1646        let schema = expected.file_metadata().schema_descr_ptr();
1647
1648        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1649        let builder =
1650            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1651
1652        // Verify that the metadata matches
1653        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1654    }
1655
1656    #[test]
1657    fn test_page_encoding_stats_mask() {
1658        let testdata = arrow::util::test_util::parquet_test_data();
1659        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1660        let file = File::open(path).unwrap();
1661
1662        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1663        let builder =
1664            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1665
1666        let row_group_metadata = builder.metadata.row_group(0);
1667
1668        // test page encoding stats
1669        let page_encoding_stats = row_group_metadata
1670            .column(0)
1671            .page_encoding_stats_mask()
1672            .unwrap();
1673        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1674        let page_encoding_stats = row_group_metadata
1675            .column(2)
1676            .page_encoding_stats_mask()
1677            .unwrap();
1678        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1679    }
1680
1681    #[test]
1682    fn test_stats_stats_skipped() {
1683        let testdata = arrow::util::test_util::parquet_test_data();
1684        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1685        let file = File::open(path).unwrap();
1686
1687        // test skipping all
1688        let arrow_options = ArrowReaderOptions::new()
1689            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1690            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1691        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1692            file.try_clone().unwrap(),
1693            arrow_options,
1694        )
1695        .unwrap();
1696
1697        let row_group_metadata = builder.metadata.row_group(0);
1698        for column in row_group_metadata.columns() {
1699            assert!(column.page_encoding_stats().is_none());
1700            assert!(column.page_encoding_stats_mask().is_none());
1701            assert!(column.statistics().is_none());
1702        }
1703
1704        // test skipping all but one column and converting to mask
1705        let arrow_options = ArrowReaderOptions::new()
1706            .with_encoding_stats_as_mask(true)
1707            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1708            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1709        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1710            file.try_clone().unwrap(),
1711            arrow_options,
1712        )
1713        .unwrap();
1714
1715        let row_group_metadata = builder.metadata.row_group(0);
1716        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1717            assert!(column.page_encoding_stats().is_none());
1718            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1719            assert_eq!(column.statistics().is_some(), idx == 0);
1720        }
1721    }
1722
1723    #[test]
1724    fn test_size_stats_stats_skipped() {
1725        let testdata = arrow::util::test_util::parquet_test_data();
1726        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1727        let file = File::open(path).unwrap();
1728
1729        // test skipping all
1730        let arrow_options =
1731            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1732        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1733            file.try_clone().unwrap(),
1734            arrow_options,
1735        )
1736        .unwrap();
1737
1738        let row_group_metadata = builder.metadata.row_group(0);
1739        for column in row_group_metadata.columns() {
1740            assert!(column.repetition_level_histogram().is_none());
1741            assert!(column.definition_level_histogram().is_none());
1742            assert!(column.unencoded_byte_array_data_bytes().is_none());
1743        }
1744
1745        // test skipping all but one column and converting to mask
1746        let arrow_options = ArrowReaderOptions::new()
1747            .with_encoding_stats_as_mask(true)
1748            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1749        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1750            file.try_clone().unwrap(),
1751            arrow_options,
1752        )
1753        .unwrap();
1754
1755        let row_group_metadata = builder.metadata.row_group(0);
1756        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1757            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1758            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1759            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1760        }
1761    }
1762
1763    #[test]
1764    fn test_arrow_reader_single_column() {
1765        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1766
1767        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1768        let original_schema = Arc::clone(builder.schema());
1769
1770        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1771        let reader = builder.with_projection(mask).build().unwrap();
1772
1773        // Verify that the schema was correctly parsed
1774        assert_eq!(1, reader.schema().fields().len());
1775        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1776    }
1777
1778    #[test]
1779    fn test_arrow_reader_single_column_by_name() {
1780        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1781
1782        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1783        let original_schema = Arc::clone(builder.schema());
1784
1785        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1786        let reader = builder.with_projection(mask).build().unwrap();
1787
1788        // Verify that the schema was correctly parsed
1789        assert_eq!(1, reader.schema().fields().len());
1790        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1791    }
1792
1793    #[test]
1794    fn test_null_column_reader_test() {
1795        let mut file = tempfile::tempfile().unwrap();
1796
1797        let schema = "
1798            message message {
1799                OPTIONAL INT32 int32;
1800            }
1801        ";
1802        let schema = Arc::new(parse_message_type(schema).unwrap());
1803
1804        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1805        generate_single_column_file_with_data::<Int32Type>(
1806            &[vec![], vec![]],
1807            Some(&def_levels),
1808            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1809            schema,
1810            Some(Field::new("int32", ArrowDataType::Null, true)),
1811            &Default::default(),
1812        )
1813        .unwrap();
1814
1815        file.rewind().unwrap();
1816
1817        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1818        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1819
1820        assert_eq!(batches.len(), 4);
1821        for batch in &batches[0..3] {
1822            assert_eq!(batch.num_rows(), 2);
1823            assert_eq!(batch.num_columns(), 1);
1824            assert_eq!(batch.column(0).null_count(), 2);
1825        }
1826
1827        assert_eq!(batches[3].num_rows(), 1);
1828        assert_eq!(batches[3].num_columns(), 1);
1829        assert_eq!(batches[3].column(0).null_count(), 1);
1830    }
1831
1832    #[test]
1833    fn test_primitive_single_column_reader_test() {
1834        run_single_column_reader_tests::<BoolType, _, BoolType>(
1835            2,
1836            ConvertedType::NONE,
1837            None,
1838            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1839            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1840        );
1841        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1842            2,
1843            ConvertedType::NONE,
1844            None,
1845            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1846            &[
1847                Encoding::PLAIN,
1848                Encoding::RLE_DICTIONARY,
1849                Encoding::DELTA_BINARY_PACKED,
1850                Encoding::BYTE_STREAM_SPLIT,
1851            ],
1852        );
1853        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1854            2,
1855            ConvertedType::NONE,
1856            None,
1857            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1858            &[
1859                Encoding::PLAIN,
1860                Encoding::RLE_DICTIONARY,
1861                Encoding::DELTA_BINARY_PACKED,
1862                Encoding::BYTE_STREAM_SPLIT,
1863            ],
1864        );
1865        run_single_column_reader_tests::<FloatType, _, FloatType>(
1866            2,
1867            ConvertedType::NONE,
1868            None,
1869            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1870            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1871        );
1872    }
1873
1874    #[test]
1875    fn test_unsigned_primitive_single_column_reader_test() {
1876        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1877            2,
1878            ConvertedType::UINT_32,
1879            Some(ArrowDataType::UInt32),
1880            |vals| {
1881                Arc::new(UInt32Array::from_iter(
1882                    vals.iter().map(|x| x.map(|x| x as u32)),
1883                ))
1884            },
1885            &[
1886                Encoding::PLAIN,
1887                Encoding::RLE_DICTIONARY,
1888                Encoding::DELTA_BINARY_PACKED,
1889            ],
1890        );
1891        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1892            2,
1893            ConvertedType::UINT_64,
1894            Some(ArrowDataType::UInt64),
1895            |vals| {
1896                Arc::new(UInt64Array::from_iter(
1897                    vals.iter().map(|x| x.map(|x| x as u64)),
1898                ))
1899            },
1900            &[
1901                Encoding::PLAIN,
1902                Encoding::RLE_DICTIONARY,
1903                Encoding::DELTA_BINARY_PACKED,
1904            ],
1905        );
1906    }
1907
1908    #[test]
1909    fn test_unsigned_roundtrip() {
1910        let schema = Arc::new(Schema::new(vec![
1911            Field::new("uint32", ArrowDataType::UInt32, true),
1912            Field::new("uint64", ArrowDataType::UInt64, true),
1913        ]));
1914
1915        let mut buf = Vec::with_capacity(1024);
1916        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1917
1918        let original = RecordBatch::try_new(
1919            schema,
1920            vec![
1921                Arc::new(UInt32Array::from_iter_values([
1922                    0,
1923                    i32::MAX as u32,
1924                    u32::MAX,
1925                ])),
1926                Arc::new(UInt64Array::from_iter_values([
1927                    0,
1928                    i64::MAX as u64,
1929                    u64::MAX,
1930                ])),
1931            ],
1932        )
1933        .unwrap();
1934
1935        writer.write(&original).unwrap();
1936        writer.close().unwrap();
1937
1938        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1939        let ret = reader.next().unwrap().unwrap();
1940        assert_eq!(ret, original);
1941
1942        // Check they can be downcast to the correct type
1943        ret.column(0)
1944            .as_any()
1945            .downcast_ref::<UInt32Array>()
1946            .unwrap();
1947
1948        ret.column(1)
1949            .as_any()
1950            .downcast_ref::<UInt64Array>()
1951            .unwrap();
1952    }
1953
1954    #[test]
1955    fn test_float16_roundtrip() -> Result<()> {
1956        let schema = Arc::new(Schema::new(vec![
1957            Field::new("float16", ArrowDataType::Float16, false),
1958            Field::new("float16-nullable", ArrowDataType::Float16, true),
1959        ]));
1960
1961        let mut buf = Vec::with_capacity(1024);
1962        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1963
1964        let original = RecordBatch::try_new(
1965            schema,
1966            vec![
1967                Arc::new(Float16Array::from_iter_values([
1968                    f16::EPSILON,
1969                    f16::MIN,
1970                    f16::MAX,
1971                    f16::NAN,
1972                    f16::INFINITY,
1973                    f16::NEG_INFINITY,
1974                    f16::ONE,
1975                    f16::NEG_ONE,
1976                    f16::ZERO,
1977                    f16::NEG_ZERO,
1978                    f16::E,
1979                    f16::PI,
1980                    f16::FRAC_1_PI,
1981                ])),
1982                Arc::new(Float16Array::from(vec![
1983                    None,
1984                    None,
1985                    None,
1986                    Some(f16::NAN),
1987                    Some(f16::INFINITY),
1988                    Some(f16::NEG_INFINITY),
1989                    None,
1990                    None,
1991                    None,
1992                    None,
1993                    None,
1994                    None,
1995                    Some(f16::FRAC_1_PI),
1996                ])),
1997            ],
1998        )?;
1999
2000        writer.write(&original)?;
2001        writer.close()?;
2002
2003        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2004        let ret = reader.next().unwrap()?;
2005        assert_eq!(ret, original);
2006
2007        // Ensure can be downcast to the correct type
2008        ret.column(0).as_primitive::<Float16Type>();
2009        ret.column(1).as_primitive::<Float16Type>();
2010
2011        Ok(())
2012    }
2013
2014    #[test]
2015    fn test_time_utc_roundtrip() -> Result<()> {
2016        let schema = Arc::new(Schema::new(vec![
2017            Field::new(
2018                "time_millis",
2019                ArrowDataType::Time32(TimeUnit::Millisecond),
2020                true,
2021            )
2022            .with_metadata(HashMap::from_iter(vec![(
2023                "adjusted_to_utc".to_string(),
2024                "".to_string(),
2025            )])),
2026            Field::new(
2027                "time_micros",
2028                ArrowDataType::Time64(TimeUnit::Microsecond),
2029                true,
2030            )
2031            .with_metadata(HashMap::from_iter(vec![(
2032                "adjusted_to_utc".to_string(),
2033                "".to_string(),
2034            )])),
2035        ]));
2036
2037        let mut buf = Vec::with_capacity(1024);
2038        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2039
2040        let original = RecordBatch::try_new(
2041            schema,
2042            vec![
2043                Arc::new(Time32MillisecondArray::from(vec![
2044                    Some(-1),
2045                    Some(0),
2046                    Some(86_399_000),
2047                    Some(86_400_000),
2048                    Some(86_401_000),
2049                    None,
2050                ])),
2051                Arc::new(Time64MicrosecondArray::from(vec![
2052                    Some(-1),
2053                    Some(0),
2054                    Some(86_399 * 1_000_000),
2055                    Some(86_400 * 1_000_000),
2056                    Some(86_401 * 1_000_000),
2057                    None,
2058                ])),
2059            ],
2060        )?;
2061
2062        writer.write(&original)?;
2063        writer.close()?;
2064
2065        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2066        let ret = reader.next().unwrap()?;
2067        assert_eq!(ret, original);
2068
2069        // Ensure can be downcast to the correct type
2070        ret.column(0).as_primitive::<Time32MillisecondType>();
2071        ret.column(1).as_primitive::<Time64MicrosecondType>();
2072
2073        Ok(())
2074    }
2075
2076    #[test]
2077    fn test_date32_roundtrip() -> Result<()> {
2078        use arrow_array::Date32Array;
2079
2080        let schema = Arc::new(Schema::new(vec![Field::new(
2081            "date32",
2082            ArrowDataType::Date32,
2083            false,
2084        )]));
2085
2086        let mut buf = Vec::with_capacity(1024);
2087
2088        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2089
2090        let original = RecordBatch::try_new(
2091            schema,
2092            vec![Arc::new(Date32Array::from(vec![
2093                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2094            ]))],
2095        )?;
2096
2097        writer.write(&original)?;
2098        writer.close()?;
2099
2100        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2101        let ret = reader.next().unwrap()?;
2102        assert_eq!(ret, original);
2103
2104        // Ensure can be downcast to the correct type
2105        ret.column(0).as_primitive::<Date32Type>();
2106
2107        Ok(())
2108    }
2109
2110    #[test]
2111    fn test_date64_roundtrip() -> Result<()> {
2112        use arrow_array::Date64Array;
2113
2114        let schema = Arc::new(Schema::new(vec![
2115            Field::new("small-date64", ArrowDataType::Date64, false),
2116            Field::new("big-date64", ArrowDataType::Date64, false),
2117            Field::new("invalid-date64", ArrowDataType::Date64, false),
2118        ]));
2119
2120        let mut default_buf = Vec::with_capacity(1024);
2121        let mut coerce_buf = Vec::with_capacity(1024);
2122
2123        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2124
2125        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2126        let mut coerce_writer =
2127            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2128
2129        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2130
2131        let original = RecordBatch::try_new(
2132            schema,
2133            vec![
2134                // small-date64
2135                Arc::new(Date64Array::from(vec![
2136                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2137                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2138                    0,
2139                    1_000 * NUM_MILLISECONDS_IN_DAY,
2140                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2141                ])),
2142                // big-date64
2143                Arc::new(Date64Array::from(vec![
2144                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2145                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146                    0,
2147                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2148                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2149                ])),
2150                // invalid-date64
2151                Arc::new(Date64Array::from(vec![
2152                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2153                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154                    1,
2155                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2156                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2157                ])),
2158            ],
2159        )?;
2160
2161        default_writer.write(&original)?;
2162        coerce_writer.write(&original)?;
2163
2164        default_writer.close()?;
2165        coerce_writer.close()?;
2166
2167        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2168        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2169
2170        let default_ret = default_reader.next().unwrap()?;
2171        let coerce_ret = coerce_reader.next().unwrap()?;
2172
2173        // Roundtrip should be successful when default writer used
2174        assert_eq!(default_ret, original);
2175
2176        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2177        assert_eq!(coerce_ret.column(0), original.column(0));
2178        assert_ne!(coerce_ret.column(1), original.column(1));
2179        assert_ne!(coerce_ret.column(2), original.column(2));
2180
2181        // Ensure both can be downcast to the correct type
2182        default_ret.column(0).as_primitive::<Date64Type>();
2183        coerce_ret.column(0).as_primitive::<Date64Type>();
2184
2185        Ok(())
2186    }
2187    struct RandFixedLenGen {}
2188
2189    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2190        fn r#gen(len: i32) -> FixedLenByteArray {
2191            let mut v = vec![0u8; len as usize];
2192            rng().fill_bytes(&mut v);
2193            ByteArray::from(v).into()
2194        }
2195    }
2196
2197    #[test]
2198    fn test_fixed_length_binary_column_reader() {
2199        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2200            20,
2201            ConvertedType::NONE,
2202            None,
2203            |vals| {
2204                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2205                for val in vals {
2206                    match val {
2207                        Some(b) => builder.append_value(b).unwrap(),
2208                        None => builder.append_null(),
2209                    }
2210                }
2211                Arc::new(builder.finish())
2212            },
2213            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2214        );
2215    }
2216
2217    #[test]
2218    fn test_interval_day_time_column_reader() {
2219        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2220            12,
2221            ConvertedType::INTERVAL,
2222            None,
2223            |vals| {
2224                Arc::new(
2225                    vals.iter()
2226                        .map(|x| {
2227                            x.as_ref().map(|b| IntervalDayTime {
2228                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2229                                milliseconds: i32::from_le_bytes(
2230                                    b.as_ref()[8..12].try_into().unwrap(),
2231                                ),
2232                            })
2233                        })
2234                        .collect::<IntervalDayTimeArray>(),
2235                )
2236            },
2237            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2238        );
2239    }
2240
2241    #[test]
2242    fn test_int96_single_column_reader_test() {
2243        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2244
2245        type TypeHintAndConversionFunction =
2246            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2247
2248        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2249            // Test without a specified ArrowType hint.
2250            (None, |vals: &[Option<Int96>]| {
2251                Arc::new(TimestampNanosecondArray::from_iter(
2252                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2253                )) as ArrayRef
2254            }),
2255            // Test other TimeUnits as ArrowType hints.
2256            (
2257                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2258                |vals: &[Option<Int96>]| {
2259                    Arc::new(TimestampSecondArray::from_iter(
2260                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2261                    )) as ArrayRef
2262                },
2263            ),
2264            (
2265                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2266                |vals: &[Option<Int96>]| {
2267                    Arc::new(TimestampMillisecondArray::from_iter(
2268                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2269                    )) as ArrayRef
2270                },
2271            ),
2272            (
2273                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2274                |vals: &[Option<Int96>]| {
2275                    Arc::new(TimestampMicrosecondArray::from_iter(
2276                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2277                    )) as ArrayRef
2278                },
2279            ),
2280            (
2281                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2282                |vals: &[Option<Int96>]| {
2283                    Arc::new(TimestampNanosecondArray::from_iter(
2284                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2285                    )) as ArrayRef
2286                },
2287            ),
2288            // Test another timezone with TimeUnit as ArrowType hints.
2289            (
2290                Some(ArrowDataType::Timestamp(
2291                    TimeUnit::Second,
2292                    Some(Arc::from("-05:00")),
2293                )),
2294                |vals: &[Option<Int96>]| {
2295                    Arc::new(
2296                        TimestampSecondArray::from_iter(
2297                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2298                        )
2299                        .with_timezone("-05:00"),
2300                    ) as ArrayRef
2301                },
2302            ),
2303        ];
2304
2305        resolutions.iter().for_each(|(arrow_type, converter)| {
2306            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2307                2,
2308                ConvertedType::NONE,
2309                arrow_type.clone(),
2310                converter,
2311                encodings,
2312            );
2313        })
2314    }
2315
2316    #[test]
2317    fn test_int96_from_spark_file_with_provided_schema() {
2318        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2319        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2320        // microsecond resolution for the Parquet reader to interpret these values correctly.
2321        use arrow_schema::DataType::Timestamp;
2322        let test_data = arrow::util::test_util::parquet_test_data();
2323        let path = format!("{test_data}/int96_from_spark.parquet");
2324        let file = File::open(path).unwrap();
2325
2326        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2327            "a",
2328            Timestamp(TimeUnit::Microsecond, None),
2329            true,
2330        )]));
2331        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2332
2333        let mut record_reader =
2334            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2335                .unwrap()
2336                .build()
2337                .unwrap();
2338
2339        let batch = record_reader.next().unwrap().unwrap();
2340        assert_eq!(batch.num_columns(), 1);
2341        let column = batch.column(0);
2342        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2343
2344        let expected = Arc::new(Int64Array::from(vec![
2345            Some(1704141296123456),
2346            Some(1704070800000000),
2347            Some(253402225200000000),
2348            Some(1735599600000000),
2349            None,
2350            Some(9089380393200000000),
2351        ]));
2352
2353        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2354        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2355        // anyway, so this should be a zero-copy non-modifying cast.
2356
2357        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2358        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2359
2360        assert_eq!(casted_timestamps.len(), expected.len());
2361
2362        casted_timestamps
2363            .iter()
2364            .zip(expected.iter())
2365            .for_each(|(lhs, rhs)| {
2366                assert_eq!(lhs, rhs);
2367            });
2368    }
2369
2370    #[test]
2371    fn test_int96_from_spark_file_without_provided_schema() {
2372        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2373        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2374        // values when read as nanosecond resolution overflow and result in garbage values.
2375        use arrow_schema::DataType::Timestamp;
2376        let test_data = arrow::util::test_util::parquet_test_data();
2377        let path = format!("{test_data}/int96_from_spark.parquet");
2378        let file = File::open(path).unwrap();
2379
2380        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2381            .unwrap()
2382            .build()
2383            .unwrap();
2384
2385        let batch = record_reader.next().unwrap().unwrap();
2386        assert_eq!(batch.num_columns(), 1);
2387        let column = batch.column(0);
2388        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2389
2390        let expected = Arc::new(Int64Array::from(vec![
2391            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2392            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2393            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2394            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2395            None,
2396            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2397        ]));
2398
2399        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2400        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2401        // anyway, so this should be a zero-copy non-modifying cast.
2402
2403        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2404        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2405
2406        assert_eq!(casted_timestamps.len(), expected.len());
2407
2408        casted_timestamps
2409            .iter()
2410            .zip(expected.iter())
2411            .for_each(|(lhs, rhs)| {
2412                assert_eq!(lhs, rhs);
2413            });
2414    }
2415
2416    struct RandUtf8Gen {}
2417
2418    impl RandGen<ByteArrayType> for RandUtf8Gen {
2419        fn r#gen(len: i32) -> ByteArray {
2420            Int32Type::r#gen(len).to_string().as_str().into()
2421        }
2422    }
2423
2424    #[test]
2425    fn test_utf8_single_column_reader_test() {
2426        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2427            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2428                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2429            })))
2430        }
2431
2432        let encodings = &[
2433            Encoding::PLAIN,
2434            Encoding::RLE_DICTIONARY,
2435            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2436            Encoding::DELTA_BYTE_ARRAY,
2437        ];
2438
2439        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2440            2,
2441            ConvertedType::NONE,
2442            None,
2443            |vals| {
2444                Arc::new(BinaryArray::from_iter(
2445                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2446                ))
2447            },
2448            encodings,
2449        );
2450
2451        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2452            2,
2453            ConvertedType::UTF8,
2454            None,
2455            string_converter::<i32>,
2456            encodings,
2457        );
2458
2459        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2460            2,
2461            ConvertedType::UTF8,
2462            Some(ArrowDataType::Utf8),
2463            string_converter::<i32>,
2464            encodings,
2465        );
2466
2467        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2468            2,
2469            ConvertedType::UTF8,
2470            Some(ArrowDataType::LargeUtf8),
2471            string_converter::<i64>,
2472            encodings,
2473        );
2474
2475        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2476        for key in &small_key_types {
2477            for encoding in encodings {
2478                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2479                opts.encoding = *encoding;
2480
2481                let data_type =
2482                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2483
2484                // Cannot run full test suite as keys overflow, run small test instead
2485                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2486                    opts,
2487                    2,
2488                    ConvertedType::UTF8,
2489                    Some(data_type.clone()),
2490                    move |vals| {
2491                        let vals = string_converter::<i32>(vals);
2492                        arrow::compute::cast(&vals, &data_type).unwrap()
2493                    },
2494                );
2495            }
2496        }
2497
2498        let key_types = [
2499            ArrowDataType::Int16,
2500            ArrowDataType::UInt16,
2501            ArrowDataType::Int32,
2502            ArrowDataType::UInt32,
2503            ArrowDataType::Int64,
2504            ArrowDataType::UInt64,
2505        ];
2506
2507        for key in &key_types {
2508            let data_type =
2509                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2510
2511            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2512                2,
2513                ConvertedType::UTF8,
2514                Some(data_type.clone()),
2515                move |vals| {
2516                    let vals = string_converter::<i32>(vals);
2517                    arrow::compute::cast(&vals, &data_type).unwrap()
2518                },
2519                encodings,
2520            );
2521
2522            let data_type = ArrowDataType::Dictionary(
2523                Box::new(key.clone()),
2524                Box::new(ArrowDataType::LargeUtf8),
2525            );
2526
2527            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2528                2,
2529                ConvertedType::UTF8,
2530                Some(data_type.clone()),
2531                move |vals| {
2532                    let vals = string_converter::<i64>(vals);
2533                    arrow::compute::cast(&vals, &data_type).unwrap()
2534                },
2535                encodings,
2536            );
2537        }
2538    }
2539
2540    #[test]
2541    fn test_decimal_nullable_struct() {
2542        let decimals = Decimal256Array::from_iter_values(
2543            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2544        );
2545
2546        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2547            "decimals",
2548            decimals.data_type().clone(),
2549            false,
2550        )])))
2551        .len(8)
2552        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2553        .child_data(vec![decimals.into_data()])
2554        .build()
2555        .unwrap();
2556
2557        let written =
2558            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2559                .unwrap();
2560
2561        let mut buffer = Vec::with_capacity(1024);
2562        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2563        writer.write(&written).unwrap();
2564        writer.close().unwrap();
2565
2566        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2567            .unwrap()
2568            .collect::<Result<Vec<_>, _>>()
2569            .unwrap();
2570
2571        assert_eq!(&written.slice(0, 3), &read[0]);
2572        assert_eq!(&written.slice(3, 3), &read[1]);
2573        assert_eq!(&written.slice(6, 2), &read[2]);
2574    }
2575
2576    #[test]
2577    fn test_int32_nullable_struct() {
2578        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2579        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2580            "int32",
2581            int32.data_type().clone(),
2582            false,
2583        )])))
2584        .len(8)
2585        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2586        .child_data(vec![int32.into_data()])
2587        .build()
2588        .unwrap();
2589
2590        let written =
2591            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2592                .unwrap();
2593
2594        let mut buffer = Vec::with_capacity(1024);
2595        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2596        writer.write(&written).unwrap();
2597        writer.close().unwrap();
2598
2599        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2600            .unwrap()
2601            .collect::<Result<Vec<_>, _>>()
2602            .unwrap();
2603
2604        assert_eq!(&written.slice(0, 3), &read[0]);
2605        assert_eq!(&written.slice(3, 3), &read[1]);
2606        assert_eq!(&written.slice(6, 2), &read[2]);
2607    }
2608
2609    #[test]
2610    fn test_decimal_list() {
2611        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2612
2613        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2614        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2615            decimals.data_type().clone(),
2616            false,
2617        ))))
2618        .len(7)
2619        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2620        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2621        .child_data(vec![decimals.into_data()])
2622        .build()
2623        .unwrap();
2624
2625        let written =
2626            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2627                .unwrap();
2628
2629        let mut buffer = Vec::with_capacity(1024);
2630        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2631        writer.write(&written).unwrap();
2632        writer.close().unwrap();
2633
2634        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2635            .unwrap()
2636            .collect::<Result<Vec<_>, _>>()
2637            .unwrap();
2638
2639        assert_eq!(&written.slice(0, 3), &read[0]);
2640        assert_eq!(&written.slice(3, 3), &read[1]);
2641        assert_eq!(&written.slice(6, 1), &read[2]);
2642    }
2643
2644    #[test]
2645    fn test_read_decimal_file() {
2646        use arrow_array::Decimal128Array;
2647        let testdata = arrow::util::test_util::parquet_test_data();
2648        let file_variants = vec![
2649            ("byte_array", 4),
2650            ("fixed_length", 25),
2651            ("int32", 4),
2652            ("int64", 10),
2653        ];
2654        for (prefix, target_precision) in file_variants {
2655            let path = format!("{testdata}/{prefix}_decimal.parquet");
2656            let file = File::open(path).unwrap();
2657            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2658
2659            let batch = record_reader.next().unwrap().unwrap();
2660            assert_eq!(batch.num_rows(), 24);
2661            let col = batch
2662                .column(0)
2663                .as_any()
2664                .downcast_ref::<Decimal128Array>()
2665                .unwrap();
2666
2667            let expected = 1..25;
2668
2669            assert_eq!(col.precision(), target_precision);
2670            assert_eq!(col.scale(), 2);
2671
2672            for (i, v) in expected.enumerate() {
2673                assert_eq!(col.value(i), v * 100_i128);
2674            }
2675        }
2676    }
2677
2678    #[test]
2679    fn test_read_float16_nonzeros_file() {
2680        use arrow_array::Float16Array;
2681        let testdata = arrow::util::test_util::parquet_test_data();
2682        // see https://github.com/apache/parquet-testing/pull/40
2683        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2684        let file = File::open(path).unwrap();
2685        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2686
2687        let batch = record_reader.next().unwrap().unwrap();
2688        assert_eq!(batch.num_rows(), 8);
2689        let col = batch
2690            .column(0)
2691            .as_any()
2692            .downcast_ref::<Float16Array>()
2693            .unwrap();
2694
2695        let f16_two = f16::ONE + f16::ONE;
2696
2697        assert_eq!(col.null_count(), 1);
2698        assert!(col.is_null(0));
2699        assert_eq!(col.value(1), f16::ONE);
2700        assert_eq!(col.value(2), -f16_two);
2701        assert!(col.value(3).is_nan());
2702        assert_eq!(col.value(4), f16::ZERO);
2703        assert!(col.value(4).is_sign_positive());
2704        assert_eq!(col.value(5), f16::NEG_ONE);
2705        assert_eq!(col.value(6), f16::NEG_ZERO);
2706        assert!(col.value(6).is_sign_negative());
2707        assert_eq!(col.value(7), f16_two);
2708    }
2709
2710    #[test]
2711    fn test_read_float16_zeros_file() {
2712        use arrow_array::Float16Array;
2713        let testdata = arrow::util::test_util::parquet_test_data();
2714        // see https://github.com/apache/parquet-testing/pull/40
2715        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2716        let file = File::open(path).unwrap();
2717        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2718
2719        let batch = record_reader.next().unwrap().unwrap();
2720        assert_eq!(batch.num_rows(), 3);
2721        let col = batch
2722            .column(0)
2723            .as_any()
2724            .downcast_ref::<Float16Array>()
2725            .unwrap();
2726
2727        assert_eq!(col.null_count(), 1);
2728        assert!(col.is_null(0));
2729        assert_eq!(col.value(1), f16::ZERO);
2730        assert!(col.value(1).is_sign_positive());
2731        assert!(col.value(2).is_nan());
2732    }
2733
2734    #[test]
2735    fn test_read_float32_float64_byte_stream_split() {
2736        let path = format!(
2737            "{}/byte_stream_split.zstd.parquet",
2738            arrow::util::test_util::parquet_test_data(),
2739        );
2740        let file = File::open(path).unwrap();
2741        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2742
2743        let mut row_count = 0;
2744        for batch in record_reader {
2745            let batch = batch.unwrap();
2746            row_count += batch.num_rows();
2747            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2748            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2749
2750            // This file contains floats from a standard normal distribution
2751            for &x in f32_col.values() {
2752                assert!(x > -10.0);
2753                assert!(x < 10.0);
2754            }
2755            for &x in f64_col.values() {
2756                assert!(x > -10.0);
2757                assert!(x < 10.0);
2758            }
2759        }
2760        assert_eq!(row_count, 300);
2761    }
2762
2763    #[test]
2764    fn test_read_extended_byte_stream_split() {
2765        let path = format!(
2766            "{}/byte_stream_split_extended.gzip.parquet",
2767            arrow::util::test_util::parquet_test_data(),
2768        );
2769        let file = File::open(path).unwrap();
2770        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2771
2772        let mut row_count = 0;
2773        for batch in record_reader {
2774            let batch = batch.unwrap();
2775            row_count += batch.num_rows();
2776
2777            // 0,1 are f16
2778            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2779            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2780            assert_eq!(f16_col.len(), f16_bss.len());
2781            f16_col
2782                .iter()
2783                .zip(f16_bss.iter())
2784                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2785
2786            // 2,3 are f32
2787            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2788            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2789            assert_eq!(f32_col.len(), f32_bss.len());
2790            f32_col
2791                .iter()
2792                .zip(f32_bss.iter())
2793                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2794
2795            // 4,5 are f64
2796            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2797            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2798            assert_eq!(f64_col.len(), f64_bss.len());
2799            f64_col
2800                .iter()
2801                .zip(f64_bss.iter())
2802                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2803
2804            // 6,7 are i32
2805            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2806            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2807            assert_eq!(i32_col.len(), i32_bss.len());
2808            i32_col
2809                .iter()
2810                .zip(i32_bss.iter())
2811                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2812
2813            // 8,9 are i64
2814            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2815            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2816            assert_eq!(i64_col.len(), i64_bss.len());
2817            i64_col
2818                .iter()
2819                .zip(i64_bss.iter())
2820                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2821
2822            // 10,11 are FLBA(5)
2823            let flba_col = batch.column(10).as_fixed_size_binary();
2824            let flba_bss = batch.column(11).as_fixed_size_binary();
2825            assert_eq!(flba_col.len(), flba_bss.len());
2826            flba_col
2827                .iter()
2828                .zip(flba_bss.iter())
2829                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2830
2831            // 12,13 are FLBA(4) (decimal(7,3))
2832            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2833            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2834            assert_eq!(dec_col.len(), dec_bss.len());
2835            dec_col
2836                .iter()
2837                .zip(dec_bss.iter())
2838                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2839        }
2840        assert_eq!(row_count, 200);
2841    }
2842
2843    #[test]
2844    fn test_read_incorrect_map_schema_file() {
2845        let testdata = arrow::util::test_util::parquet_test_data();
2846        // see https://github.com/apache/parquet-testing/pull/47
2847        let path = format!("{testdata}/incorrect_map_schema.parquet");
2848        let file = File::open(path).unwrap();
2849        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2850
2851        let batch = record_reader.next().unwrap().unwrap();
2852        assert_eq!(batch.num_rows(), 1);
2853
2854        let expected_schema = Schema::new(vec![Field::new(
2855            "my_map",
2856            ArrowDataType::Map(
2857                Arc::new(Field::new(
2858                    "key_value",
2859                    ArrowDataType::Struct(Fields::from(vec![
2860                        Field::new("key", ArrowDataType::Utf8, false),
2861                        Field::new("value", ArrowDataType::Utf8, true),
2862                    ])),
2863                    false,
2864                )),
2865                false,
2866            ),
2867            true,
2868        )]);
2869        assert_eq!(batch.schema().as_ref(), &expected_schema);
2870
2871        assert_eq!(batch.num_rows(), 1);
2872        assert_eq!(batch.column(0).null_count(), 0);
2873        assert_eq!(
2874            batch.column(0).as_map().keys().as_ref(),
2875            &StringArray::from(vec!["parent", "name"])
2876        );
2877        assert_eq!(
2878            batch.column(0).as_map().values().as_ref(),
2879            &StringArray::from(vec!["another", "report"])
2880        );
2881    }
2882
2883    #[test]
2884    fn test_read_dict_fixed_size_binary() {
2885        let schema = Arc::new(Schema::new(vec![Field::new(
2886            "a",
2887            ArrowDataType::Dictionary(
2888                Box::new(ArrowDataType::UInt8),
2889                Box::new(ArrowDataType::FixedSizeBinary(8)),
2890            ),
2891            true,
2892        )]));
2893        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2894        let values = FixedSizeBinaryArray::try_from_iter(
2895            vec![
2896                (0u8..8u8).collect::<Vec<u8>>(),
2897                (24u8..32u8).collect::<Vec<u8>>(),
2898            ]
2899            .into_iter(),
2900        )
2901        .unwrap();
2902        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2903        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2904
2905        let mut buffer = Vec::with_capacity(1024);
2906        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2907        writer.write(&batch).unwrap();
2908        writer.close().unwrap();
2909        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2910            .unwrap()
2911            .collect::<Result<Vec<_>, _>>()
2912            .unwrap();
2913
2914        assert_eq!(read.len(), 1);
2915        assert_eq!(&batch, &read[0])
2916    }
2917
2918    #[test]
2919    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2920        // the `StructArrayReader` will check the definition and repetition levels of the first
2921        // child column in the struct to determine nullability for the struct. If the first
2922        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2923        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2924        // buffers managed by this array reader.
2925
2926        let struct_fields = Fields::from(vec![
2927            Field::new(
2928                "city",
2929                ArrowDataType::Dictionary(
2930                    Box::new(ArrowDataType::UInt8),
2931                    Box::new(ArrowDataType::Utf8),
2932                ),
2933                true,
2934            ),
2935            Field::new("name", ArrowDataType::Utf8, true),
2936        ]);
2937        let schema = Arc::new(Schema::new(vec![Field::new(
2938            "items",
2939            ArrowDataType::Struct(struct_fields.clone()),
2940            true,
2941        )]));
2942
2943        let items_arr = StructArray::new(
2944            struct_fields,
2945            vec![
2946                Arc::new(DictionaryArray::new(
2947                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2948                    Arc::new(StringArray::from_iter_values(vec![
2949                        "quebec",
2950                        "fredericton",
2951                        "halifax",
2952                    ])),
2953                )),
2954                Arc::new(StringArray::from_iter_values(vec![
2955                    "albert", "terry", "lance", "", "tim",
2956                ])),
2957            ],
2958            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2959        );
2960
2961        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2962        let mut buffer = Vec::with_capacity(1024);
2963        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2964        writer.write(&batch).unwrap();
2965        writer.close().unwrap();
2966        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2967            .unwrap()
2968            .collect::<Result<Vec<_>, _>>()
2969            .unwrap();
2970
2971        assert_eq!(read.len(), 1);
2972        assert_eq!(&batch, &read[0])
2973    }
2974
2975    /// Parameters for single_column_reader_test
2976    #[derive(Clone)]
2977    struct TestOptions {
2978        /// Number of row group to write to parquet (row group size =
2979        /// num_row_groups / num_rows)
2980        num_row_groups: usize,
2981        /// Total number of rows per row group
2982        num_rows: usize,
2983        /// Size of batches to read back
2984        record_batch_size: usize,
2985        /// Percentage of nulls in column or None if required
2986        null_percent: Option<usize>,
2987        /// Set write batch size
2988        ///
2989        /// This is the number of rows that are written at once to a page and
2990        /// therefore acts as a bound on the page granularity of a row group
2991        write_batch_size: usize,
2992        /// Maximum size of page in bytes
2993        max_data_page_size: usize,
2994        /// Maximum size of dictionary page in bytes
2995        max_dict_page_size: usize,
2996        /// Writer version
2997        writer_version: WriterVersion,
2998        /// Enabled statistics
2999        enabled_statistics: EnabledStatistics,
3000        /// Encoding
3001        encoding: Encoding,
3002        /// row selections and total selected row count
3003        row_selections: Option<(RowSelection, usize)>,
3004        /// row filter
3005        row_filter: Option<Vec<bool>>,
3006        /// limit
3007        limit: Option<usize>,
3008        /// offset
3009        offset: Option<usize>,
3010    }
3011
3012    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3013    impl std::fmt::Debug for TestOptions {
3014        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3015            f.debug_struct("TestOptions")
3016                .field("num_row_groups", &self.num_row_groups)
3017                .field("num_rows", &self.num_rows)
3018                .field("record_batch_size", &self.record_batch_size)
3019                .field("null_percent", &self.null_percent)
3020                .field("write_batch_size", &self.write_batch_size)
3021                .field("max_data_page_size", &self.max_data_page_size)
3022                .field("max_dict_page_size", &self.max_dict_page_size)
3023                .field("writer_version", &self.writer_version)
3024                .field("enabled_statistics", &self.enabled_statistics)
3025                .field("encoding", &self.encoding)
3026                .field("row_selections", &self.row_selections.is_some())
3027                .field("row_filter", &self.row_filter.is_some())
3028                .field("limit", &self.limit)
3029                .field("offset", &self.offset)
3030                .finish()
3031        }
3032    }
3033
3034    impl Default for TestOptions {
3035        fn default() -> Self {
3036            Self {
3037                num_row_groups: 2,
3038                num_rows: 100,
3039                record_batch_size: 15,
3040                null_percent: None,
3041                write_batch_size: 64,
3042                max_data_page_size: 1024 * 1024,
3043                max_dict_page_size: 1024 * 1024,
3044                writer_version: WriterVersion::PARQUET_1_0,
3045                enabled_statistics: EnabledStatistics::Page,
3046                encoding: Encoding::PLAIN,
3047                row_selections: None,
3048                row_filter: None,
3049                limit: None,
3050                offset: None,
3051            }
3052        }
3053    }
3054
3055    impl TestOptions {
3056        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3057            Self {
3058                num_row_groups,
3059                num_rows,
3060                record_batch_size,
3061                ..Default::default()
3062            }
3063        }
3064
3065        fn with_null_percent(self, null_percent: usize) -> Self {
3066            Self {
3067                null_percent: Some(null_percent),
3068                ..self
3069            }
3070        }
3071
3072        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3073            Self {
3074                max_data_page_size,
3075                ..self
3076            }
3077        }
3078
3079        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3080            Self {
3081                max_dict_page_size,
3082                ..self
3083            }
3084        }
3085
3086        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3087            Self {
3088                enabled_statistics,
3089                ..self
3090            }
3091        }
3092
3093        fn with_row_selections(self) -> Self {
3094            assert!(self.row_filter.is_none(), "Must set row selection first");
3095
3096            let mut rng = rng();
3097            let step = rng.random_range(self.record_batch_size..self.num_rows);
3098            let row_selections = create_test_selection(
3099                step,
3100                self.num_row_groups * self.num_rows,
3101                rng.random::<bool>(),
3102            );
3103            Self {
3104                row_selections: Some(row_selections),
3105                ..self
3106            }
3107        }
3108
3109        fn with_row_filter(self) -> Self {
3110            let row_count = match &self.row_selections {
3111                Some((_, count)) => *count,
3112                None => self.num_row_groups * self.num_rows,
3113            };
3114
3115            let mut rng = rng();
3116            Self {
3117                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3118                ..self
3119            }
3120        }
3121
3122        fn with_limit(self, limit: usize) -> Self {
3123            Self {
3124                limit: Some(limit),
3125                ..self
3126            }
3127        }
3128
3129        fn with_offset(self, offset: usize) -> Self {
3130            Self {
3131                offset: Some(offset),
3132                ..self
3133            }
3134        }
3135
3136        fn writer_props(&self) -> WriterProperties {
3137            let builder = WriterProperties::builder()
3138                .set_data_page_size_limit(self.max_data_page_size)
3139                .set_write_batch_size(self.write_batch_size)
3140                .set_writer_version(self.writer_version)
3141                .set_statistics_enabled(self.enabled_statistics);
3142
3143            let builder = match self.encoding {
3144                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3145                    .set_dictionary_enabled(true)
3146                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3147                _ => builder
3148                    .set_dictionary_enabled(false)
3149                    .set_encoding(self.encoding),
3150            };
3151
3152            builder.build()
3153        }
3154    }
3155
3156    /// Create a parquet file and then read it using
3157    /// `ParquetFileArrowReader` using a standard set of parameters
3158    /// `opts`.
3159    ///
3160    /// `rand_max` represents the maximum size of value to pass to to
3161    /// value generator
3162    fn run_single_column_reader_tests<T, F, G>(
3163        rand_max: i32,
3164        converted_type: ConvertedType,
3165        arrow_type: Option<ArrowDataType>,
3166        converter: F,
3167        encodings: &[Encoding],
3168    ) where
3169        T: DataType,
3170        G: RandGen<T>,
3171        F: Fn(&[Option<T::T>]) -> ArrayRef,
3172    {
3173        let all_options = vec![
3174            // choose record_batch_batch (15) so batches cross row
3175            // group boundaries (50 rows in 2 row groups) cases.
3176            TestOptions::new(2, 100, 15),
3177            // choose record_batch_batch (5) so batches sometime fall
3178            // on row group boundaries and (25 rows in 3 row groups
3179            // --> row groups of 10, 10, and 5). Tests buffer
3180            // refilling edge cases.
3181            TestOptions::new(3, 25, 5),
3182            // Choose record_batch_size (25) so all batches fall
3183            // exactly on row group boundary (25). Tests buffer
3184            // refilling edge cases.
3185            TestOptions::new(4, 100, 25),
3186            // Set maximum page size so row groups have multiple pages
3187            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3188            // Set small dictionary page size to test dictionary fallback
3189            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3190            // Test optional but with no nulls
3191            TestOptions::new(2, 256, 127).with_null_percent(0),
3192            // Test optional with nulls
3193            TestOptions::new(2, 256, 93).with_null_percent(25),
3194            // Test with limit of 0
3195            TestOptions::new(4, 100, 25).with_limit(0),
3196            // Test with limit of 50
3197            TestOptions::new(4, 100, 25).with_limit(50),
3198            // Test with limit equal to number of rows
3199            TestOptions::new(4, 100, 25).with_limit(10),
3200            // Test with limit larger than number of rows
3201            TestOptions::new(4, 100, 25).with_limit(101),
3202            // Test with limit + offset equal to number of rows
3203            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3204            // Test with limit + offset equal to number of rows
3205            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3206            // Test with limit + offset larger than number of rows
3207            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3208            // Test with no page-level statistics
3209            TestOptions::new(2, 256, 91)
3210                .with_null_percent(25)
3211                .with_enabled_statistics(EnabledStatistics::Chunk),
3212            // Test with no statistics
3213            TestOptions::new(2, 256, 91)
3214                .with_null_percent(25)
3215                .with_enabled_statistics(EnabledStatistics::None),
3216            // Test with all null
3217            TestOptions::new(2, 128, 91)
3218                .with_null_percent(100)
3219                .with_enabled_statistics(EnabledStatistics::None),
3220            // Test skip
3221
3222            // choose record_batch_batch (15) so batches cross row
3223            // group boundaries (50 rows in 2 row groups) cases.
3224            TestOptions::new(2, 100, 15).with_row_selections(),
3225            // choose record_batch_batch (5) so batches sometime fall
3226            // on row group boundaries and (25 rows in 3 row groups
3227            // --> row groups of 10, 10, and 5). Tests buffer
3228            // refilling edge cases.
3229            TestOptions::new(3, 25, 5).with_row_selections(),
3230            // Choose record_batch_size (25) so all batches fall
3231            // exactly on row group boundary (25). Tests buffer
3232            // refilling edge cases.
3233            TestOptions::new(4, 100, 25).with_row_selections(),
3234            // Set maximum page size so row groups have multiple pages
3235            TestOptions::new(3, 256, 73)
3236                .with_max_data_page_size(128)
3237                .with_row_selections(),
3238            // Set small dictionary page size to test dictionary fallback
3239            TestOptions::new(3, 256, 57)
3240                .with_max_dict_page_size(128)
3241                .with_row_selections(),
3242            // Test optional but with no nulls
3243            TestOptions::new(2, 256, 127)
3244                .with_null_percent(0)
3245                .with_row_selections(),
3246            // Test optional with nulls
3247            TestOptions::new(2, 256, 93)
3248                .with_null_percent(25)
3249                .with_row_selections(),
3250            // Test optional with nulls
3251            TestOptions::new(2, 256, 93)
3252                .with_null_percent(25)
3253                .with_row_selections()
3254                .with_limit(10),
3255            // Test optional with nulls
3256            TestOptions::new(2, 256, 93)
3257                .with_null_percent(25)
3258                .with_row_selections()
3259                .with_offset(20)
3260                .with_limit(10),
3261            // Test filter
3262
3263            // Test with row filter
3264            TestOptions::new(4, 100, 25).with_row_filter(),
3265            // Test with row selection and row filter
3266            TestOptions::new(4, 100, 25)
3267                .with_row_selections()
3268                .with_row_filter(),
3269            // Test with nulls and row filter
3270            TestOptions::new(2, 256, 93)
3271                .with_null_percent(25)
3272                .with_max_data_page_size(10)
3273                .with_row_filter(),
3274            // Test with nulls and row filter and small pages
3275            TestOptions::new(2, 256, 93)
3276                .with_null_percent(25)
3277                .with_max_data_page_size(10)
3278                .with_row_selections()
3279                .with_row_filter(),
3280            // Test with row selection and no offset index and small pages
3281            TestOptions::new(2, 256, 93)
3282                .with_enabled_statistics(EnabledStatistics::None)
3283                .with_max_data_page_size(10)
3284                .with_row_selections(),
3285        ];
3286
3287        all_options.into_iter().for_each(|opts| {
3288            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3289                for encoding in encodings {
3290                    let opts = TestOptions {
3291                        writer_version,
3292                        encoding: *encoding,
3293                        ..opts.clone()
3294                    };
3295
3296                    single_column_reader_test::<T, _, G>(
3297                        opts,
3298                        rand_max,
3299                        converted_type,
3300                        arrow_type.clone(),
3301                        &converter,
3302                    )
3303                }
3304            }
3305        });
3306    }
3307
3308    /// Create a parquet file and then read it using
3309    /// `ParquetFileArrowReader` using the parameters described in
3310    /// `opts`.
3311    fn single_column_reader_test<T, F, G>(
3312        opts: TestOptions,
3313        rand_max: i32,
3314        converted_type: ConvertedType,
3315        arrow_type: Option<ArrowDataType>,
3316        converter: F,
3317    ) where
3318        T: DataType,
3319        G: RandGen<T>,
3320        F: Fn(&[Option<T::T>]) -> ArrayRef,
3321    {
3322        // Print out options to facilitate debugging failures on CI
3323        println!(
3324            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3325            T::get_physical_type(),
3326            converted_type,
3327            arrow_type,
3328            opts
3329        );
3330
3331        //according to null_percent generate def_levels
3332        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3333            Some(null_percent) => {
3334                let mut rng = rng();
3335
3336                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3337                    .map(|_| {
3338                        std::iter::from_fn(|| {
3339                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3340                        })
3341                        .take(opts.num_rows)
3342                        .collect()
3343                    })
3344                    .collect();
3345                (Repetition::OPTIONAL, Some(def_levels))
3346            }
3347            None => (Repetition::REQUIRED, None),
3348        };
3349
3350        //generate random table data
3351        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3352            .map(|idx| {
3353                let null_count = match def_levels.as_ref() {
3354                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3355                    None => 0,
3356                };
3357                G::gen_vec(rand_max, opts.num_rows - null_count)
3358            })
3359            .collect();
3360
3361        let len = match T::get_physical_type() {
3362            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3363            crate::basic::Type::INT96 => 12,
3364            _ => -1,
3365        };
3366
3367        let fields = vec![Arc::new(
3368            Type::primitive_type_builder("leaf", T::get_physical_type())
3369                .with_repetition(repetition)
3370                .with_converted_type(converted_type)
3371                .with_length(len)
3372                .build()
3373                .unwrap(),
3374        )];
3375
3376        let schema = Arc::new(
3377            Type::group_type_builder("test_schema")
3378                .with_fields(fields)
3379                .build()
3380                .unwrap(),
3381        );
3382
3383        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3384
3385        let mut file = tempfile::tempfile().unwrap();
3386
3387        generate_single_column_file_with_data::<T>(
3388            &values,
3389            def_levels.as_ref(),
3390            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3391            schema,
3392            arrow_field,
3393            &opts,
3394        )
3395        .unwrap();
3396
3397        file.rewind().unwrap();
3398
3399        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3400            opts.enabled_statistics == EnabledStatistics::Page,
3401        ));
3402
3403        let mut builder =
3404            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3405
3406        let expected_data = match opts.row_selections {
3407            Some((selections, row_count)) => {
3408                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3409
3410                let mut skip_data: Vec<Option<T::T>> = vec![];
3411                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3412                for select in dequeue {
3413                    if select.skip {
3414                        without_skip_data.drain(0..select.row_count);
3415                    } else {
3416                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3417                    }
3418                }
3419                builder = builder.with_row_selection(selections);
3420
3421                assert_eq!(skip_data.len(), row_count);
3422                skip_data
3423            }
3424            None => {
3425                //get flatten table data
3426                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3427                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3428                expected_data
3429            }
3430        };
3431
3432        let mut expected_data = match opts.row_filter {
3433            Some(filter) => {
3434                let expected_data = expected_data
3435                    .into_iter()
3436                    .zip(filter.iter())
3437                    .filter_map(|(d, f)| f.then(|| d))
3438                    .collect();
3439
3440                let mut filter_offset = 0;
3441                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3442                    ProjectionMask::all(),
3443                    move |b| {
3444                        let array = BooleanArray::from_iter(
3445                            filter
3446                                .iter()
3447                                .skip(filter_offset)
3448                                .take(b.num_rows())
3449                                .map(|x| Some(*x)),
3450                        );
3451                        filter_offset += b.num_rows();
3452                        Ok(array)
3453                    },
3454                ))]);
3455
3456                builder = builder.with_row_filter(filter);
3457                expected_data
3458            }
3459            None => expected_data,
3460        };
3461
3462        if let Some(offset) = opts.offset {
3463            builder = builder.with_offset(offset);
3464            expected_data = expected_data.into_iter().skip(offset).collect();
3465        }
3466
3467        if let Some(limit) = opts.limit {
3468            builder = builder.with_limit(limit);
3469            expected_data = expected_data.into_iter().take(limit).collect();
3470        }
3471
3472        let mut record_reader = builder
3473            .with_batch_size(opts.record_batch_size)
3474            .build()
3475            .unwrap();
3476
3477        let mut total_read = 0;
3478        loop {
3479            let maybe_batch = record_reader.next();
3480            if total_read < expected_data.len() {
3481                let end = min(total_read + opts.record_batch_size, expected_data.len());
3482                let batch = maybe_batch.unwrap().unwrap();
3483                assert_eq!(end - total_read, batch.num_rows());
3484
3485                let a = converter(&expected_data[total_read..end]);
3486                let b = batch.column(0);
3487
3488                assert_eq!(a.data_type(), b.data_type());
3489                assert_eq!(a.to_data(), b.to_data());
3490                assert_eq!(
3491                    a.as_any().type_id(),
3492                    b.as_any().type_id(),
3493                    "incorrect type ids"
3494                );
3495
3496                total_read = end;
3497            } else {
3498                assert!(maybe_batch.is_none());
3499                break;
3500            }
3501        }
3502    }
3503
3504    fn gen_expected_data<T: DataType>(
3505        def_levels: Option<&Vec<Vec<i16>>>,
3506        values: &[Vec<T::T>],
3507    ) -> Vec<Option<T::T>> {
3508        let data: Vec<Option<T::T>> = match def_levels {
3509            Some(levels) => {
3510                let mut values_iter = values.iter().flatten();
3511                levels
3512                    .iter()
3513                    .flatten()
3514                    .map(|d| match d {
3515                        1 => Some(values_iter.next().cloned().unwrap()),
3516                        0 => None,
3517                        _ => unreachable!(),
3518                    })
3519                    .collect()
3520            }
3521            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3522        };
3523        data
3524    }
3525
3526    fn generate_single_column_file_with_data<T: DataType>(
3527        values: &[Vec<T::T>],
3528        def_levels: Option<&Vec<Vec<i16>>>,
3529        file: File,
3530        schema: TypePtr,
3531        field: Option<Field>,
3532        opts: &TestOptions,
3533    ) -> Result<ParquetMetaData> {
3534        let mut writer_props = opts.writer_props();
3535        if let Some(field) = field {
3536            let arrow_schema = Schema::new(vec![field]);
3537            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3538        }
3539
3540        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3541
3542        for (idx, v) in values.iter().enumerate() {
3543            let def_levels = def_levels.map(|d| d[idx].as_slice());
3544            let mut row_group_writer = writer.next_row_group()?;
3545            {
3546                let mut column_writer = row_group_writer
3547                    .next_column()?
3548                    .expect("Column writer is none!");
3549
3550                column_writer
3551                    .typed::<T>()
3552                    .write_batch(v, def_levels, None)?;
3553
3554                column_writer.close()?;
3555            }
3556            row_group_writer.close()?;
3557        }
3558
3559        writer.close()
3560    }
3561
3562    fn get_test_file(file_name: &str) -> File {
3563        let mut path = PathBuf::new();
3564        path.push(arrow::util::test_util::arrow_test_data());
3565        path.push(file_name);
3566
3567        File::open(path.as_path()).expect("File not found!")
3568    }
3569
3570    #[test]
3571    fn test_read_structs() {
3572        // This particular test file has columns of struct types where there is
3573        // a column that has the same name as one of the struct fields
3574        // (see: ARROW-11452)
3575        let testdata = arrow::util::test_util::parquet_test_data();
3576        let path = format!("{testdata}/nested_structs.rust.parquet");
3577        let file = File::open(&path).unwrap();
3578        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3579
3580        for batch in record_batch_reader {
3581            batch.unwrap();
3582        }
3583
3584        let file = File::open(&path).unwrap();
3585        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3586
3587        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3588        let projected_reader = builder
3589            .with_projection(mask)
3590            .with_batch_size(60)
3591            .build()
3592            .unwrap();
3593
3594        let expected_schema = Schema::new(vec![
3595            Field::new(
3596                "roll_num",
3597                ArrowDataType::Struct(Fields::from(vec![Field::new(
3598                    "count",
3599                    ArrowDataType::UInt64,
3600                    false,
3601                )])),
3602                false,
3603            ),
3604            Field::new(
3605                "PC_CUR",
3606                ArrowDataType::Struct(Fields::from(vec![
3607                    Field::new("mean", ArrowDataType::Int64, false),
3608                    Field::new("sum", ArrowDataType::Int64, false),
3609                ])),
3610                false,
3611            ),
3612        ]);
3613
3614        // Tests for #1652 and #1654
3615        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3616
3617        for batch in projected_reader {
3618            let batch = batch.unwrap();
3619            assert_eq!(batch.schema().as_ref(), &expected_schema);
3620        }
3621    }
3622
3623    #[test]
3624    // same as test_read_structs but constructs projection mask via column names
3625    fn test_read_structs_by_name() {
3626        let testdata = arrow::util::test_util::parquet_test_data();
3627        let path = format!("{testdata}/nested_structs.rust.parquet");
3628        let file = File::open(&path).unwrap();
3629        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3630
3631        for batch in record_batch_reader {
3632            batch.unwrap();
3633        }
3634
3635        let file = File::open(&path).unwrap();
3636        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3637
3638        let mask = ProjectionMask::columns(
3639            builder.parquet_schema(),
3640            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3641        );
3642        let projected_reader = builder
3643            .with_projection(mask)
3644            .with_batch_size(60)
3645            .build()
3646            .unwrap();
3647
3648        let expected_schema = Schema::new(vec![
3649            Field::new(
3650                "roll_num",
3651                ArrowDataType::Struct(Fields::from(vec![Field::new(
3652                    "count",
3653                    ArrowDataType::UInt64,
3654                    false,
3655                )])),
3656                false,
3657            ),
3658            Field::new(
3659                "PC_CUR",
3660                ArrowDataType::Struct(Fields::from(vec![
3661                    Field::new("mean", ArrowDataType::Int64, false),
3662                    Field::new("sum", ArrowDataType::Int64, false),
3663                ])),
3664                false,
3665            ),
3666        ]);
3667
3668        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3669
3670        for batch in projected_reader {
3671            let batch = batch.unwrap();
3672            assert_eq!(batch.schema().as_ref(), &expected_schema);
3673        }
3674    }
3675
3676    #[test]
3677    fn test_read_maps() {
3678        let testdata = arrow::util::test_util::parquet_test_data();
3679        let path = format!("{testdata}/nested_maps.snappy.parquet");
3680        let file = File::open(path).unwrap();
3681        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3682
3683        for batch in record_batch_reader {
3684            batch.unwrap();
3685        }
3686    }
3687
3688    #[test]
3689    fn test_nested_nullability() {
3690        let message_type = "message nested {
3691          OPTIONAL Group group {
3692            REQUIRED INT32 leaf;
3693          }
3694        }";
3695
3696        let file = tempfile::tempfile().unwrap();
3697        let schema = Arc::new(parse_message_type(message_type).unwrap());
3698
3699        {
3700            // Write using low-level parquet API (#1167)
3701            let mut writer =
3702                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3703                    .unwrap();
3704
3705            {
3706                let mut row_group_writer = writer.next_row_group().unwrap();
3707                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3708
3709                column_writer
3710                    .typed::<Int32Type>()
3711                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3712                    .unwrap();
3713
3714                column_writer.close().unwrap();
3715                row_group_writer.close().unwrap();
3716            }
3717
3718            writer.close().unwrap();
3719        }
3720
3721        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3722        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3723
3724        let reader = builder.with_projection(mask).build().unwrap();
3725
3726        let expected_schema = Schema::new(vec![Field::new(
3727            "group",
3728            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3729            true,
3730        )]);
3731
3732        let batch = reader.into_iter().next().unwrap().unwrap();
3733        assert_eq!(batch.schema().as_ref(), &expected_schema);
3734        assert_eq!(batch.num_rows(), 4);
3735        assert_eq!(batch.column(0).null_count(), 2);
3736    }
3737
3738    #[test]
3739    fn test_invalid_utf8() {
3740        // a parquet file with 1 column with invalid utf8
3741        let data = vec![
3742            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3743            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3744            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3745            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3746            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3747            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3748            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3749            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3750            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3751            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3752            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3753            82, 49,
3754        ];
3755
3756        let file = Bytes::from(data);
3757        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3758
3759        let error = record_batch_reader.next().unwrap().unwrap_err();
3760
3761        assert!(
3762            error.to_string().contains("invalid utf-8 sequence"),
3763            "{}",
3764            error
3765        );
3766    }
3767
3768    #[test]
3769    fn test_invalid_utf8_string_array() {
3770        test_invalid_utf8_string_array_inner::<i32>();
3771    }
3772
3773    #[test]
3774    fn test_invalid_utf8_large_string_array() {
3775        test_invalid_utf8_string_array_inner::<i64>();
3776    }
3777
3778    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3779        let cases = [
3780            invalid_utf8_first_char::<O>(),
3781            invalid_utf8_first_char_long_strings::<O>(),
3782            invalid_utf8_later_char::<O>(),
3783            invalid_utf8_later_char_long_strings::<O>(),
3784            invalid_utf8_later_char_really_long_strings::<O>(),
3785            invalid_utf8_later_char_really_long_strings2::<O>(),
3786        ];
3787        for array in &cases {
3788            for encoding in STRING_ENCODINGS {
3789                // data is not valid utf8 we can not construct a correct StringArray
3790                // safely, so purposely create an invalid StringArray
3791                let array = unsafe {
3792                    GenericStringArray::<O>::new_unchecked(
3793                        array.offsets().clone(),
3794                        array.values().clone(),
3795                        array.nulls().cloned(),
3796                    )
3797                };
3798                let data_type = array.data_type().clone();
3799                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3800                let err = read_from_parquet(data).unwrap_err();
3801                let expected_err =
3802                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3803                assert!(
3804                    err.to_string().contains(expected_err),
3805                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3806                );
3807            }
3808        }
3809    }
3810
3811    #[test]
3812    fn test_invalid_utf8_string_view_array() {
3813        let cases = [
3814            invalid_utf8_first_char::<i32>(),
3815            invalid_utf8_first_char_long_strings::<i32>(),
3816            invalid_utf8_later_char::<i32>(),
3817            invalid_utf8_later_char_long_strings::<i32>(),
3818            invalid_utf8_later_char_really_long_strings::<i32>(),
3819            invalid_utf8_later_char_really_long_strings2::<i32>(),
3820        ];
3821
3822        for encoding in STRING_ENCODINGS {
3823            for array in &cases {
3824                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3825                let array = array.as_binary_view();
3826
3827                // data is not valid utf8 we can not construct a correct StringArray
3828                // safely, so purposely create an invalid StringViewArray
3829                let array = unsafe {
3830                    StringViewArray::new_unchecked(
3831                        array.views().clone(),
3832                        array.data_buffers().to_vec(),
3833                        array.nulls().cloned(),
3834                    )
3835                };
3836
3837                let data_type = array.data_type().clone();
3838                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3839                let err = read_from_parquet(data).unwrap_err();
3840                let expected_err =
3841                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3842                assert!(
3843                    err.to_string().contains(expected_err),
3844                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3845                );
3846            }
3847        }
3848    }
3849
3850    /// Encodings suitable for string data
3851    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3852        None,
3853        Some(Encoding::PLAIN),
3854        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3855        Some(Encoding::DELTA_BYTE_ARRAY),
3856    ];
3857
3858    /// Invalid Utf-8 sequence in the first character
3859    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3860    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3861
3862    /// Invalid Utf=8 sequence in NOT the first character
3863    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3864    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3865
3866    /// returns a BinaryArray with invalid UTF8 data in the first character
3867    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3868        let valid: &[u8] = b"   ";
3869        let invalid = INVALID_UTF8_FIRST_CHAR;
3870        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3871    }
3872
3873    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3874    /// string larger than 12 bytes which is handled specially when reading
3875    /// `ByteViewArray`s
3876    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3877        let valid: &[u8] = b"   ";
3878        let mut invalid = vec![];
3879        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3880        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3881        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3882    }
3883
3884    /// returns a BinaryArray with invalid UTF8 data in a character other than
3885    /// the first (this is checked in a special codepath)
3886    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3887        let valid: &[u8] = b"   ";
3888        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3889        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3890    }
3891
3892    /// returns a BinaryArray with invalid UTF8 data in a character other than
3893    /// the first in a string larger than 12 bytes which is handled specially
3894    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3895    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3896        let valid: &[u8] = b"   ";
3897        let mut invalid = vec![];
3898        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3899        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3900        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3901    }
3902
3903    /// returns a BinaryArray with invalid UTF8 data in a character other than
3904    /// the first in a string larger than 128 bytes which is handled specially
3905    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3906    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3907        let valid: &[u8] = b"   ";
3908        let mut invalid = vec![];
3909        for _ in 0..10 {
3910            // each instance is 38 bytes
3911            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3912        }
3913        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3914        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3915    }
3916
3917    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3918    /// invalid UTF8 data in a character other than the first in a string larger
3919    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3920        let valid: &[u8] = b"   ";
3921        let mut valid_long = vec![];
3922        for _ in 0..10 {
3923            // each instance is 38 bytes
3924            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3925        }
3926        let invalid = INVALID_UTF8_LATER_CHAR;
3927        GenericBinaryArray::<O>::from_iter(vec![
3928            None,
3929            Some(valid),
3930            Some(invalid),
3931            None,
3932            Some(&valid_long),
3933            Some(valid),
3934        ])
3935    }
3936
3937    /// writes the array into a single column parquet file with the specified
3938    /// encoding.
3939    ///
3940    /// If no encoding is specified, use default (dictionary) encoding
3941    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3942        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3943        let mut data = vec![];
3944        let schema = batch.schema();
3945        let props = encoding.map(|encoding| {
3946            WriterProperties::builder()
3947                // must disable dictionary encoding to actually use encoding
3948                .set_dictionary_enabled(false)
3949                .set_encoding(encoding)
3950                .build()
3951        });
3952
3953        {
3954            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3955            writer.write(&batch).unwrap();
3956            writer.flush().unwrap();
3957            writer.close().unwrap();
3958        };
3959        data
3960    }
3961
3962    /// read the parquet file into a record batch
3963    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3964        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3965            .unwrap()
3966            .build()
3967            .unwrap();
3968
3969        reader.collect()
3970    }
3971
3972    #[test]
3973    fn test_dictionary_preservation() {
3974        let fields = vec![Arc::new(
3975            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3976                .with_repetition(Repetition::OPTIONAL)
3977                .with_converted_type(ConvertedType::UTF8)
3978                .build()
3979                .unwrap(),
3980        )];
3981
3982        let schema = Arc::new(
3983            Type::group_type_builder("test_schema")
3984                .with_fields(fields)
3985                .build()
3986                .unwrap(),
3987        );
3988
3989        let dict_type = ArrowDataType::Dictionary(
3990            Box::new(ArrowDataType::Int32),
3991            Box::new(ArrowDataType::Utf8),
3992        );
3993
3994        let arrow_field = Field::new("leaf", dict_type, true);
3995
3996        let mut file = tempfile::tempfile().unwrap();
3997
3998        let values = vec![
3999            vec![
4000                ByteArray::from("hello"),
4001                ByteArray::from("a"),
4002                ByteArray::from("b"),
4003                ByteArray::from("d"),
4004            ],
4005            vec![
4006                ByteArray::from("c"),
4007                ByteArray::from("a"),
4008                ByteArray::from("b"),
4009            ],
4010        ];
4011
4012        let def_levels = vec![
4013            vec![1, 0, 0, 1, 0, 0, 1, 1],
4014            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4015        ];
4016
4017        let opts = TestOptions {
4018            encoding: Encoding::RLE_DICTIONARY,
4019            ..Default::default()
4020        };
4021
4022        generate_single_column_file_with_data::<ByteArrayType>(
4023            &values,
4024            Some(&def_levels),
4025            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
4026            schema,
4027            Some(arrow_field),
4028            &opts,
4029        )
4030        .unwrap();
4031
4032        file.rewind().unwrap();
4033
4034        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4035
4036        let batches = record_reader
4037            .collect::<Result<Vec<RecordBatch>, _>>()
4038            .unwrap();
4039
4040        assert_eq!(batches.len(), 6);
4041        assert!(batches.iter().all(|x| x.num_columns() == 1));
4042
4043        let row_counts = batches
4044            .iter()
4045            .map(|x| (x.num_rows(), x.column(0).null_count()))
4046            .collect::<Vec<_>>();
4047
4048        assert_eq!(
4049            row_counts,
4050            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4051        );
4052
4053        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4054
4055        // First and second batch in same row group -> same dictionary
4056        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4057        // Third batch spans row group -> computed dictionary
4058        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4059        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4060        // Fourth, fifth and sixth from same row group -> same dictionary
4061        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4062        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4063    }
4064
4065    #[test]
4066    fn test_read_null_list() {
4067        let testdata = arrow::util::test_util::parquet_test_data();
4068        let path = format!("{testdata}/null_list.parquet");
4069        let file = File::open(path).unwrap();
4070        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4071
4072        let batch = record_batch_reader.next().unwrap().unwrap();
4073        assert_eq!(batch.num_rows(), 1);
4074        assert_eq!(batch.num_columns(), 1);
4075        assert_eq!(batch.column(0).len(), 1);
4076
4077        let list = batch
4078            .column(0)
4079            .as_any()
4080            .downcast_ref::<ListArray>()
4081            .unwrap();
4082        assert_eq!(list.len(), 1);
4083        assert!(list.is_valid(0));
4084
4085        let val = list.value(0);
4086        assert_eq!(val.len(), 0);
4087    }
4088
4089    #[test]
4090    fn test_null_schema_inference() {
4091        let testdata = arrow::util::test_util::parquet_test_data();
4092        let path = format!("{testdata}/null_list.parquet");
4093        let file = File::open(path).unwrap();
4094
4095        let arrow_field = Field::new(
4096            "emptylist",
4097            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4098            true,
4099        );
4100
4101        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4102        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4103        let schema = builder.schema();
4104        assert_eq!(schema.fields().len(), 1);
4105        assert_eq!(schema.field(0), &arrow_field);
4106    }
4107
4108    #[test]
4109    fn test_skip_metadata() {
4110        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4111        let field = Field::new("col", col.data_type().clone(), true);
4112
4113        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4114
4115        let metadata = [("key".to_string(), "value".to_string())]
4116            .into_iter()
4117            .collect();
4118
4119        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4120
4121        assert_ne!(schema_with_metadata, schema_without_metadata);
4122
4123        let batch =
4124            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4125
4126        let file = |version: WriterVersion| {
4127            let props = WriterProperties::builder()
4128                .set_writer_version(version)
4129                .build();
4130
4131            let file = tempfile().unwrap();
4132            let mut writer =
4133                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4134                    .unwrap();
4135            writer.write(&batch).unwrap();
4136            writer.close().unwrap();
4137            file
4138        };
4139
4140        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4141
4142        let v1_reader = file(WriterVersion::PARQUET_1_0);
4143        let v2_reader = file(WriterVersion::PARQUET_2_0);
4144
4145        let arrow_reader =
4146            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4147        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4148
4149        let reader =
4150            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4151                .unwrap()
4152                .build()
4153                .unwrap();
4154        assert_eq!(reader.schema(), schema_without_metadata);
4155
4156        let arrow_reader =
4157            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4158        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4159
4160        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4161            .unwrap()
4162            .build()
4163            .unwrap();
4164        assert_eq!(reader.schema(), schema_without_metadata);
4165    }
4166
4167    fn write_parquet_from_iter<I, F>(value: I) -> File
4168    where
4169        I: IntoIterator<Item = (F, ArrayRef)>,
4170        F: AsRef<str>,
4171    {
4172        let batch = RecordBatch::try_from_iter(value).unwrap();
4173        let file = tempfile().unwrap();
4174        let mut writer =
4175            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4176        writer.write(&batch).unwrap();
4177        writer.close().unwrap();
4178        file
4179    }
4180
4181    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4182    where
4183        I: IntoIterator<Item = (F, ArrayRef)>,
4184        F: AsRef<str>,
4185    {
4186        let file = write_parquet_from_iter(value);
4187        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4188        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4189            file.try_clone().unwrap(),
4190            options_with_schema,
4191        );
4192        assert_eq!(builder.err().unwrap().to_string(), expected_error);
4193    }
4194
4195    #[test]
4196    fn test_schema_too_few_columns() {
4197        run_schema_test_with_error(
4198            vec![
4199                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4200                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4201            ],
4202            Arc::new(Schema::new(vec![Field::new(
4203                "int64",
4204                ArrowDataType::Int64,
4205                false,
4206            )])),
4207            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4208        );
4209    }
4210
4211    #[test]
4212    fn test_schema_too_many_columns() {
4213        run_schema_test_with_error(
4214            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4215            Arc::new(Schema::new(vec![
4216                Field::new("int64", ArrowDataType::Int64, false),
4217                Field::new("int32", ArrowDataType::Int32, false),
4218            ])),
4219            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4220        );
4221    }
4222
4223    #[test]
4224    fn test_schema_mismatched_column_names() {
4225        run_schema_test_with_error(
4226            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4227            Arc::new(Schema::new(vec![Field::new(
4228                "other",
4229                ArrowDataType::Int64,
4230                false,
4231            )])),
4232            "Arrow: incompatible arrow schema, expected field named int64 got other",
4233        );
4234    }
4235
4236    #[test]
4237    fn test_schema_incompatible_columns() {
4238        run_schema_test_with_error(
4239            vec![
4240                (
4241                    "col1_invalid",
4242                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4243                ),
4244                (
4245                    "col2_valid",
4246                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4247                ),
4248                (
4249                    "col3_invalid",
4250                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4251                ),
4252            ],
4253            Arc::new(Schema::new(vec![
4254                Field::new("col1_invalid", ArrowDataType::Int32, false),
4255                Field::new("col2_valid", ArrowDataType::Int32, false),
4256                Field::new("col3_invalid", ArrowDataType::Int32, false),
4257            ])),
4258            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4259        );
4260    }
4261
4262    #[test]
4263    fn test_one_incompatible_nested_column() {
4264        let nested_fields = Fields::from(vec![
4265            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4266            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4267        ]);
4268        let nested = StructArray::try_new(
4269            nested_fields,
4270            vec![
4271                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4272                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4273            ],
4274            None,
4275        )
4276        .expect("struct array");
4277        let supplied_nested_fields = Fields::from(vec![
4278            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4279            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4280        ]);
4281        run_schema_test_with_error(
4282            vec![
4283                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4284                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4285                ("nested", Arc::new(nested) as ArrayRef),
4286            ],
4287            Arc::new(Schema::new(vec![
4288                Field::new("col1", ArrowDataType::Int64, false),
4289                Field::new("col2", ArrowDataType::Int32, false),
4290                Field::new(
4291                    "nested",
4292                    ArrowDataType::Struct(supplied_nested_fields),
4293                    false,
4294                ),
4295            ])),
4296            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4297            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4298            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4299        );
4300    }
4301
4302    /// Return parquet data with a single column of utf8 strings
4303    fn utf8_parquet() -> Bytes {
4304        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4305        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4306        let props = None;
4307        // write parquet file with non nullable strings
4308        let mut parquet_data = vec![];
4309        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4310        writer.write(&batch).unwrap();
4311        writer.close().unwrap();
4312        Bytes::from(parquet_data)
4313    }
4314
4315    #[test]
4316    fn test_schema_error_bad_types() {
4317        // verify incompatible schemas error on read
4318        let parquet_data = utf8_parquet();
4319
4320        // Ask to read it back with an incompatible schema (int vs string)
4321        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4322            "column1",
4323            arrow::datatypes::DataType::Int32,
4324            false,
4325        )]));
4326
4327        // read it back out
4328        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4329        let err =
4330            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4331                .unwrap_err();
4332        assert_eq!(
4333            err.to_string(),
4334            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4335        )
4336    }
4337
4338    #[test]
4339    fn test_schema_error_bad_nullability() {
4340        // verify incompatible schemas error on read
4341        let parquet_data = utf8_parquet();
4342
4343        // Ask to read it back with an incompatible schema (nullability mismatch)
4344        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4345            "column1",
4346            arrow::datatypes::DataType::Utf8,
4347            true,
4348        )]));
4349
4350        // read it back out
4351        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4352        let err =
4353            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4354                .unwrap_err();
4355        assert_eq!(
4356            err.to_string(),
4357            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4358        )
4359    }
4360
4361    #[test]
4362    fn test_read_binary_as_utf8() {
4363        let file = write_parquet_from_iter(vec![
4364            (
4365                "binary_to_utf8",
4366                Arc::new(BinaryArray::from(vec![
4367                    b"one".as_ref(),
4368                    b"two".as_ref(),
4369                    b"three".as_ref(),
4370                ])) as ArrayRef,
4371            ),
4372            (
4373                "large_binary_to_large_utf8",
4374                Arc::new(LargeBinaryArray::from(vec![
4375                    b"one".as_ref(),
4376                    b"two".as_ref(),
4377                    b"three".as_ref(),
4378                ])) as ArrayRef,
4379            ),
4380            (
4381                "binary_view_to_utf8_view",
4382                Arc::new(BinaryViewArray::from(vec![
4383                    b"one".as_ref(),
4384                    b"two".as_ref(),
4385                    b"three".as_ref(),
4386                ])) as ArrayRef,
4387            ),
4388        ]);
4389        let supplied_fields = Fields::from(vec![
4390            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4391            Field::new(
4392                "large_binary_to_large_utf8",
4393                ArrowDataType::LargeUtf8,
4394                false,
4395            ),
4396            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4397        ]);
4398
4399        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4400        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4401            file.try_clone().unwrap(),
4402            options,
4403        )
4404        .expect("reader builder with schema")
4405        .build()
4406        .expect("reader with schema");
4407
4408        let batch = arrow_reader.next().unwrap().unwrap();
4409        assert_eq!(batch.num_columns(), 3);
4410        assert_eq!(batch.num_rows(), 3);
4411        assert_eq!(
4412            batch
4413                .column(0)
4414                .as_string::<i32>()
4415                .iter()
4416                .collect::<Vec<_>>(),
4417            vec![Some("one"), Some("two"), Some("three")]
4418        );
4419
4420        assert_eq!(
4421            batch
4422                .column(1)
4423                .as_string::<i64>()
4424                .iter()
4425                .collect::<Vec<_>>(),
4426            vec![Some("one"), Some("two"), Some("three")]
4427        );
4428
4429        assert_eq!(
4430            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4431            vec![Some("one"), Some("two"), Some("three")]
4432        );
4433    }
4434
4435    #[test]
4436    #[should_panic(expected = "Invalid UTF8 sequence at")]
4437    fn test_read_non_utf8_binary_as_utf8() {
4438        let file = write_parquet_from_iter(vec![(
4439            "non_utf8_binary",
4440            Arc::new(BinaryArray::from(vec![
4441                b"\xDE\x00\xFF".as_ref(),
4442                b"\xDE\x01\xAA".as_ref(),
4443                b"\xDE\x02\xFF".as_ref(),
4444            ])) as ArrayRef,
4445        )]);
4446        let supplied_fields = Fields::from(vec![Field::new(
4447            "non_utf8_binary",
4448            ArrowDataType::Utf8,
4449            false,
4450        )]);
4451
4452        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4453        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4454            file.try_clone().unwrap(),
4455            options,
4456        )
4457        .expect("reader builder with schema")
4458        .build()
4459        .expect("reader with schema");
4460        arrow_reader.next().unwrap().unwrap_err();
4461    }
4462
4463    #[test]
4464    fn test_with_schema() {
4465        let nested_fields = Fields::from(vec![
4466            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4467            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4468        ]);
4469
4470        let nested_arrays: Vec<ArrayRef> = vec![
4471            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4472            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4473        ];
4474
4475        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4476
4477        let file = write_parquet_from_iter(vec![
4478            (
4479                "int32_to_ts_second",
4480                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4481            ),
4482            (
4483                "date32_to_date64",
4484                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4485            ),
4486            ("nested", Arc::new(nested) as ArrayRef),
4487        ]);
4488
4489        let supplied_nested_fields = Fields::from(vec![
4490            Field::new(
4491                "utf8_to_dict",
4492                ArrowDataType::Dictionary(
4493                    Box::new(ArrowDataType::Int32),
4494                    Box::new(ArrowDataType::Utf8),
4495                ),
4496                false,
4497            ),
4498            Field::new(
4499                "int64_to_ts_nano",
4500                ArrowDataType::Timestamp(
4501                    arrow::datatypes::TimeUnit::Nanosecond,
4502                    Some("+10:00".into()),
4503                ),
4504                false,
4505            ),
4506        ]);
4507
4508        let supplied_schema = Arc::new(Schema::new(vec![
4509            Field::new(
4510                "int32_to_ts_second",
4511                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4512                false,
4513            ),
4514            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4515            Field::new(
4516                "nested",
4517                ArrowDataType::Struct(supplied_nested_fields),
4518                false,
4519            ),
4520        ]));
4521
4522        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4523        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4524            file.try_clone().unwrap(),
4525            options,
4526        )
4527        .expect("reader builder with schema")
4528        .build()
4529        .expect("reader with schema");
4530
4531        assert_eq!(arrow_reader.schema(), supplied_schema);
4532        let batch = arrow_reader.next().unwrap().unwrap();
4533        assert_eq!(batch.num_columns(), 3);
4534        assert_eq!(batch.num_rows(), 4);
4535        assert_eq!(
4536            batch
4537                .column(0)
4538                .as_any()
4539                .downcast_ref::<TimestampSecondArray>()
4540                .expect("downcast to timestamp second")
4541                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4542                .map(|v| v.to_string())
4543                .expect("value as datetime"),
4544            "1970-01-01 01:00:00 +01:00"
4545        );
4546        assert_eq!(
4547            batch
4548                .column(1)
4549                .as_any()
4550                .downcast_ref::<Date64Array>()
4551                .expect("downcast to date64")
4552                .value_as_date(0)
4553                .map(|v| v.to_string())
4554                .expect("value as date"),
4555            "1970-01-01"
4556        );
4557
4558        let nested = batch
4559            .column(2)
4560            .as_any()
4561            .downcast_ref::<StructArray>()
4562            .expect("downcast to struct");
4563
4564        let nested_dict = nested
4565            .column(0)
4566            .as_any()
4567            .downcast_ref::<Int32DictionaryArray>()
4568            .expect("downcast to dictionary");
4569
4570        assert_eq!(
4571            nested_dict
4572                .values()
4573                .as_any()
4574                .downcast_ref::<StringArray>()
4575                .expect("downcast to string")
4576                .iter()
4577                .collect::<Vec<_>>(),
4578            vec![Some("a"), Some("b")]
4579        );
4580
4581        assert_eq!(
4582            nested_dict.keys().iter().collect::<Vec<_>>(),
4583            vec![Some(0), Some(0), Some(0), Some(1)]
4584        );
4585
4586        assert_eq!(
4587            nested
4588                .column(1)
4589                .as_any()
4590                .downcast_ref::<TimestampNanosecondArray>()
4591                .expect("downcast to timestamp nanosecond")
4592                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4593                .map(|v| v.to_string())
4594                .expect("value as datetime"),
4595            "1970-01-01 10:00:00.000000001 +10:00"
4596        );
4597    }
4598
4599    #[test]
4600    fn test_empty_projection() {
4601        let testdata = arrow::util::test_util::parquet_test_data();
4602        let path = format!("{testdata}/alltypes_plain.parquet");
4603        let file = File::open(path).unwrap();
4604
4605        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4606        let file_metadata = builder.metadata().file_metadata();
4607        let expected_rows = file_metadata.num_rows() as usize;
4608
4609        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4610        let batch_reader = builder
4611            .with_projection(mask)
4612            .with_batch_size(2)
4613            .build()
4614            .unwrap();
4615
4616        let mut total_rows = 0;
4617        for maybe_batch in batch_reader {
4618            let batch = maybe_batch.unwrap();
4619            total_rows += batch.num_rows();
4620            assert_eq!(batch.num_columns(), 0);
4621            assert!(batch.num_rows() <= 2);
4622        }
4623
4624        assert_eq!(total_rows, expected_rows);
4625    }
4626
4627    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4628        let schema = Arc::new(Schema::new(vec![Field::new(
4629            "list",
4630            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4631            true,
4632        )]));
4633
4634        let mut buf = Vec::with_capacity(1024);
4635
4636        let mut writer = ArrowWriter::try_new(
4637            &mut buf,
4638            schema.clone(),
4639            Some(
4640                WriterProperties::builder()
4641                    .set_max_row_group_size(row_group_size)
4642                    .build(),
4643            ),
4644        )
4645        .unwrap();
4646        for _ in 0..2 {
4647            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4648            for _ in 0..(batch_size) {
4649                list_builder.append(true);
4650            }
4651            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4652                .unwrap();
4653            writer.write(&batch).unwrap();
4654        }
4655        writer.close().unwrap();
4656
4657        let mut record_reader =
4658            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4659        assert_eq!(
4660            batch_size,
4661            record_reader.next().unwrap().unwrap().num_rows()
4662        );
4663        assert_eq!(
4664            batch_size,
4665            record_reader.next().unwrap().unwrap().num_rows()
4666        );
4667    }
4668
4669    #[test]
4670    fn test_row_group_exact_multiple() {
4671        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4672        test_row_group_batch(8, 8);
4673        test_row_group_batch(10, 8);
4674        test_row_group_batch(8, 10);
4675        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4676        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4677        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4678        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4679        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4680    }
4681
4682    /// Given a RecordBatch containing all the column data, return the expected batches given
4683    /// a `batch_size` and `selection`
4684    fn get_expected_batches(
4685        column: &RecordBatch,
4686        selection: &RowSelection,
4687        batch_size: usize,
4688    ) -> Vec<RecordBatch> {
4689        let mut expected_batches = vec![];
4690
4691        let mut selection: VecDeque<_> = selection.clone().into();
4692        let mut row_offset = 0;
4693        let mut last_start = None;
4694        while row_offset < column.num_rows() && !selection.is_empty() {
4695            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4696            while batch_remaining > 0 && !selection.is_empty() {
4697                let (to_read, skip) = match selection.front_mut() {
4698                    Some(selection) if selection.row_count > batch_remaining => {
4699                        selection.row_count -= batch_remaining;
4700                        (batch_remaining, selection.skip)
4701                    }
4702                    Some(_) => {
4703                        let select = selection.pop_front().unwrap();
4704                        (select.row_count, select.skip)
4705                    }
4706                    None => break,
4707                };
4708
4709                batch_remaining -= to_read;
4710
4711                match skip {
4712                    true => {
4713                        if let Some(last_start) = last_start.take() {
4714                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4715                        }
4716                        row_offset += to_read
4717                    }
4718                    false => {
4719                        last_start.get_or_insert(row_offset);
4720                        row_offset += to_read
4721                    }
4722                }
4723            }
4724        }
4725
4726        if let Some(last_start) = last_start.take() {
4727            expected_batches.push(column.slice(last_start, row_offset - last_start))
4728        }
4729
4730        // Sanity check, all batches except the final should be the batch size
4731        for batch in &expected_batches[..expected_batches.len() - 1] {
4732            assert_eq!(batch.num_rows(), batch_size);
4733        }
4734
4735        expected_batches
4736    }
4737
4738    fn create_test_selection(
4739        step_len: usize,
4740        total_len: usize,
4741        skip_first: bool,
4742    ) -> (RowSelection, usize) {
4743        let mut remaining = total_len;
4744        let mut skip = skip_first;
4745        let mut vec = vec![];
4746        let mut selected_count = 0;
4747        while remaining != 0 {
4748            let step = if remaining > step_len {
4749                step_len
4750            } else {
4751                remaining
4752            };
4753            vec.push(RowSelector {
4754                row_count: step,
4755                skip,
4756            });
4757            remaining -= step;
4758            if !skip {
4759                selected_count += step;
4760            }
4761            skip = !skip;
4762        }
4763        (vec.into(), selected_count)
4764    }
4765
4766    #[test]
4767    fn test_scan_row_with_selection() {
4768        let testdata = arrow::util::test_util::parquet_test_data();
4769        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4770        let test_file = File::open(&path).unwrap();
4771
4772        let mut serial_reader =
4773            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4774        let data = serial_reader.next().unwrap().unwrap();
4775
4776        let do_test = |batch_size: usize, selection_len: usize| {
4777            for skip_first in [false, true] {
4778                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4779
4780                let expected = get_expected_batches(&data, &selections, batch_size);
4781                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4782                assert_eq!(
4783                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4784                    expected,
4785                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4786                );
4787            }
4788        };
4789
4790        // total row count 7300
4791        // 1. test selection len more than one page row count
4792        do_test(1000, 1000);
4793
4794        // 2. test selection len less than one page row count
4795        do_test(20, 20);
4796
4797        // 3. test selection_len less than batch_size
4798        do_test(20, 5);
4799
4800        // 4. test selection_len more than batch_size
4801        // If batch_size < selection_len
4802        do_test(20, 5);
4803
4804        fn create_skip_reader(
4805            test_file: &File,
4806            batch_size: usize,
4807            selections: RowSelection,
4808        ) -> ParquetRecordBatchReader {
4809            let options =
4810                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4811            let file = test_file.try_clone().unwrap();
4812            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4813                .unwrap()
4814                .with_batch_size(batch_size)
4815                .with_row_selection(selections)
4816                .build()
4817                .unwrap()
4818        }
4819    }
4820
4821    #[test]
4822    fn test_batch_size_overallocate() {
4823        let testdata = arrow::util::test_util::parquet_test_data();
4824        // `alltypes_plain.parquet` only have 8 rows
4825        let path = format!("{testdata}/alltypes_plain.parquet");
4826        let test_file = File::open(path).unwrap();
4827
4828        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4829        let num_rows = builder.metadata.file_metadata().num_rows();
4830        let reader = builder
4831            .with_batch_size(1024)
4832            .with_projection(ProjectionMask::all())
4833            .build()
4834            .unwrap();
4835        assert_ne!(1024, num_rows);
4836        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4837    }
4838
4839    #[test]
4840    fn test_read_with_page_index_enabled() {
4841        let testdata = arrow::util::test_util::parquet_test_data();
4842
4843        {
4844            // `alltypes_tiny_pages.parquet` has page index
4845            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4846            let test_file = File::open(path).unwrap();
4847            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4848                test_file,
4849                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4850            )
4851            .unwrap();
4852            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4853            let reader = builder.build().unwrap();
4854            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4855            assert_eq!(batches.len(), 8);
4856        }
4857
4858        {
4859            // `alltypes_plain.parquet` doesn't have page index
4860            let path = format!("{testdata}/alltypes_plain.parquet");
4861            let test_file = File::open(path).unwrap();
4862            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4863                test_file,
4864                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4865            )
4866            .unwrap();
4867            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4868            // we should read the file successfully.
4869            assert!(builder.metadata().offset_index().is_none());
4870            let reader = builder.build().unwrap();
4871            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4872            assert_eq!(batches.len(), 1);
4873        }
4874    }
4875
4876    #[test]
4877    fn test_raw_repetition() {
4878        const MESSAGE_TYPE: &str = "
4879            message Log {
4880              OPTIONAL INT32 eventType;
4881              REPEATED INT32 category;
4882              REPEATED group filter {
4883                OPTIONAL INT32 error;
4884              }
4885            }
4886        ";
4887        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4888        let props = Default::default();
4889
4890        let mut buf = Vec::with_capacity(1024);
4891        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4892        let mut row_group_writer = writer.next_row_group().unwrap();
4893
4894        // column 0
4895        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4896        col_writer
4897            .typed::<Int32Type>()
4898            .write_batch(&[1], Some(&[1]), None)
4899            .unwrap();
4900        col_writer.close().unwrap();
4901        // column 1
4902        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4903        col_writer
4904            .typed::<Int32Type>()
4905            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4906            .unwrap();
4907        col_writer.close().unwrap();
4908        // column 2
4909        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4910        col_writer
4911            .typed::<Int32Type>()
4912            .write_batch(&[1], Some(&[1]), Some(&[0]))
4913            .unwrap();
4914        col_writer.close().unwrap();
4915
4916        let rg_md = row_group_writer.close().unwrap();
4917        assert_eq!(rg_md.num_rows(), 1);
4918        writer.close().unwrap();
4919
4920        let bytes = Bytes::from(buf);
4921
4922        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4923        let full = no_mask.next().unwrap().unwrap();
4924
4925        assert_eq!(full.num_columns(), 3);
4926
4927        for idx in 0..3 {
4928            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4929            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4930            let mut reader = b.with_projection(mask).build().unwrap();
4931            let projected = reader.next().unwrap().unwrap();
4932
4933            assert_eq!(projected.num_columns(), 1);
4934            assert_eq!(full.column(idx), projected.column(0));
4935        }
4936    }
4937
4938    #[test]
4939    fn test_read_lz4_raw() {
4940        let testdata = arrow::util::test_util::parquet_test_data();
4941        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4942        let file = File::open(path).unwrap();
4943
4944        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4945            .unwrap()
4946            .collect::<Result<Vec<_>, _>>()
4947            .unwrap();
4948        assert_eq!(batches.len(), 1);
4949        let batch = &batches[0];
4950
4951        assert_eq!(batch.num_columns(), 3);
4952        assert_eq!(batch.num_rows(), 4);
4953
4954        // https://github.com/apache/parquet-testing/pull/18
4955        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4956        assert_eq!(
4957            a.values(),
4958            &[1593604800, 1593604800, 1593604801, 1593604801]
4959        );
4960
4961        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4962        let a: Vec<_> = a.iter().flatten().collect();
4963        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4964
4965        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4966        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4967    }
4968
4969    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4970    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4971    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4972    //    LZ4_HADOOP algorithm for compression.
4973    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4974    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4975    //    older parquet-cpp versions.
4976    //
4977    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4978    #[test]
4979    fn test_read_lz4_hadoop_fallback() {
4980        for file in [
4981            "hadoop_lz4_compressed.parquet",
4982            "non_hadoop_lz4_compressed.parquet",
4983        ] {
4984            let testdata = arrow::util::test_util::parquet_test_data();
4985            let path = format!("{testdata}/{file}");
4986            let file = File::open(path).unwrap();
4987            let expected_rows = 4;
4988
4989            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4990                .unwrap()
4991                .collect::<Result<Vec<_>, _>>()
4992                .unwrap();
4993            assert_eq!(batches.len(), 1);
4994            let batch = &batches[0];
4995
4996            assert_eq!(batch.num_columns(), 3);
4997            assert_eq!(batch.num_rows(), expected_rows);
4998
4999            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
5000            assert_eq!(
5001                a.values(),
5002                &[1593604800, 1593604800, 1593604801, 1593604801]
5003            );
5004
5005            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5006            let b: Vec<_> = b.iter().flatten().collect();
5007            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5008
5009            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5010            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5011        }
5012    }
5013
5014    #[test]
5015    fn test_read_lz4_hadoop_large() {
5016        let testdata = arrow::util::test_util::parquet_test_data();
5017        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5018        let file = File::open(path).unwrap();
5019        let expected_rows = 10000;
5020
5021        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5022            .unwrap()
5023            .collect::<Result<Vec<_>, _>>()
5024            .unwrap();
5025        assert_eq!(batches.len(), 1);
5026        let batch = &batches[0];
5027
5028        assert_eq!(batch.num_columns(), 1);
5029        assert_eq!(batch.num_rows(), expected_rows);
5030
5031        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5032        let a: Vec<_> = a.iter().flatten().collect();
5033        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5034        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5035        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5036        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5037    }
5038
5039    #[test]
5040    #[cfg(feature = "snap")]
5041    fn test_read_nested_lists() {
5042        let testdata = arrow::util::test_util::parquet_test_data();
5043        let path = format!("{testdata}/nested_lists.snappy.parquet");
5044        let file = File::open(path).unwrap();
5045
5046        let f = file.try_clone().unwrap();
5047        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5048        let expected = reader.next().unwrap().unwrap();
5049        assert_eq!(expected.num_rows(), 3);
5050
5051        let selection = RowSelection::from(vec![
5052            RowSelector::skip(1),
5053            RowSelector::select(1),
5054            RowSelector::skip(1),
5055        ]);
5056        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5057            .unwrap()
5058            .with_row_selection(selection)
5059            .build()
5060            .unwrap();
5061
5062        let actual = reader.next().unwrap().unwrap();
5063        assert_eq!(actual.num_rows(), 1);
5064        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5065    }
5066
5067    #[test]
5068    fn test_arbitrary_decimal() {
5069        let values = [1, 2, 3, 4, 5, 6, 7, 8];
5070        let decimals_19_0 = Decimal128Array::from_iter_values(values)
5071            .with_precision_and_scale(19, 0)
5072            .unwrap();
5073        let decimals_12_0 = Decimal128Array::from_iter_values(values)
5074            .with_precision_and_scale(12, 0)
5075            .unwrap();
5076        let decimals_17_10 = Decimal128Array::from_iter_values(values)
5077            .with_precision_and_scale(17, 10)
5078            .unwrap();
5079
5080        let written = RecordBatch::try_from_iter([
5081            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5082            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5083            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5084        ])
5085        .unwrap();
5086
5087        let mut buffer = Vec::with_capacity(1024);
5088        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5089        writer.write(&written).unwrap();
5090        writer.close().unwrap();
5091
5092        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5093            .unwrap()
5094            .collect::<Result<Vec<_>, _>>()
5095            .unwrap();
5096
5097        assert_eq!(&written.slice(0, 8), &read[0]);
5098    }
5099
5100    #[test]
5101    fn test_list_skip() {
5102        let mut list = ListBuilder::new(Int32Builder::new());
5103        list.append_value([Some(1), Some(2)]);
5104        list.append_value([Some(3)]);
5105        list.append_value([Some(4)]);
5106        let list = list.finish();
5107        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5108
5109        // First page contains 2 values but only 1 row
5110        let props = WriterProperties::builder()
5111            .set_data_page_row_count_limit(1)
5112            .set_write_batch_size(2)
5113            .build();
5114
5115        let mut buffer = Vec::with_capacity(1024);
5116        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5117        writer.write(&batch).unwrap();
5118        writer.close().unwrap();
5119
5120        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5121        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5122            .unwrap()
5123            .with_row_selection(selection.into())
5124            .build()
5125            .unwrap();
5126        let out = reader.next().unwrap().unwrap();
5127        assert_eq!(out.num_rows(), 1);
5128        assert_eq!(out, batch.slice(2, 1));
5129    }
5130
5131    #[test]
5132    fn test_row_selection_interleaved_skip() -> Result<()> {
5133        let schema = Arc::new(Schema::new(vec![Field::new(
5134            "v",
5135            ArrowDataType::Int32,
5136            false,
5137        )]));
5138
5139        let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
5140        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
5141
5142        let mut buffer = Vec::with_capacity(1024);
5143        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
5144        writer.write(&batch)?;
5145        writer.close()?;
5146
5147        let selection = RowSelection::from(vec![
5148            RowSelector::select(1),
5149            RowSelector::skip(2),
5150            RowSelector::select(2),
5151        ]);
5152
5153        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
5154            .with_batch_size(4)
5155            .with_row_selection(selection)
5156            .build()?;
5157
5158        let out = reader.next().unwrap()?;
5159        assert_eq!(out.num_rows(), 3);
5160        let values = out
5161            .column(0)
5162            .as_primitive::<arrow_array::types::Int32Type>()
5163            .values();
5164        assert_eq!(values, &[0, 3, 4]);
5165        assert!(reader.next().is_none());
5166        Ok(())
5167    }
5168
5169    #[test]
5170    fn test_row_selection_mask_sparse_rows() -> Result<()> {
5171        let schema = Arc::new(Schema::new(vec![Field::new(
5172            "v",
5173            ArrowDataType::Int32,
5174            false,
5175        )]));
5176
5177        let values = Int32Array::from((0..30).collect::<Vec<i32>>());
5178        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
5179
5180        let mut buffer = Vec::with_capacity(1024);
5181        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
5182        writer.write(&batch)?;
5183        writer.close()?;
5184
5185        let total_rows = batch.num_rows();
5186        let ranges = (1..total_rows)
5187            .step_by(2)
5188            .map(|i| i..i + 1)
5189            .collect::<Vec<_>>();
5190        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
5191
5192        let selectors: Vec<RowSelector> = selection.clone().into();
5193        assert!(total_rows < selectors.len() * 8);
5194
5195        let bytes = Bytes::from(buffer);
5196
5197        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
5198            .with_batch_size(7)
5199            .with_row_selection(selection)
5200            .build()?;
5201
5202        let mut collected = Vec::new();
5203        for batch in reader {
5204            let batch = batch?;
5205            collected.extend_from_slice(
5206                batch
5207                    .column(0)
5208                    .as_primitive::<arrow_array::types::Int32Type>()
5209                    .values(),
5210            );
5211        }
5212
5213        let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5214        assert_eq!(collected, expected);
5215        Ok(())
5216    }
5217
5218    fn test_decimal32_roundtrip() {
5219        let d = |values: Vec<i32>, p: u8| {
5220            let iter = values.into_iter();
5221            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5222                .with_precision_and_scale(p, 2)
5223                .unwrap()
5224        };
5225
5226        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5227        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5228
5229        let mut buffer = Vec::with_capacity(1024);
5230        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5231        writer.write(&batch).unwrap();
5232        writer.close().unwrap();
5233
5234        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5235        let t1 = builder.parquet_schema().columns()[0].physical_type();
5236        assert_eq!(t1, PhysicalType::INT32);
5237
5238        let mut reader = builder.build().unwrap();
5239        assert_eq!(batch.schema(), reader.schema());
5240
5241        let out = reader.next().unwrap().unwrap();
5242        assert_eq!(batch, out);
5243    }
5244
5245    fn test_decimal64_roundtrip() {
5246        // Precision <= 9 -> INT32
5247        // Precision <= 18 -> INT64
5248
5249        let d = |values: Vec<i64>, p: u8| {
5250            let iter = values.into_iter();
5251            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5252                .with_precision_and_scale(p, 2)
5253                .unwrap()
5254        };
5255
5256        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5257        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5258        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5259
5260        let batch = RecordBatch::try_from_iter([
5261            ("d1", Arc::new(d1) as ArrayRef),
5262            ("d2", Arc::new(d2) as ArrayRef),
5263            ("d3", Arc::new(d3) as ArrayRef),
5264        ])
5265        .unwrap();
5266
5267        let mut buffer = Vec::with_capacity(1024);
5268        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5269        writer.write(&batch).unwrap();
5270        writer.close().unwrap();
5271
5272        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5273        let t1 = builder.parquet_schema().columns()[0].physical_type();
5274        assert_eq!(t1, PhysicalType::INT32);
5275        let t2 = builder.parquet_schema().columns()[1].physical_type();
5276        assert_eq!(t2, PhysicalType::INT64);
5277        let t3 = builder.parquet_schema().columns()[2].physical_type();
5278        assert_eq!(t3, PhysicalType::INT64);
5279
5280        let mut reader = builder.build().unwrap();
5281        assert_eq!(batch.schema(), reader.schema());
5282
5283        let out = reader.next().unwrap().unwrap();
5284        assert_eq!(batch, out);
5285    }
5286
5287    fn test_decimal_roundtrip<T: DecimalType>() {
5288        // Precision <= 9 -> INT32
5289        // Precision <= 18 -> INT64
5290        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5291
5292        let d = |values: Vec<usize>, p: u8| {
5293            let iter = values.into_iter().map(T::Native::usize_as);
5294            PrimitiveArray::<T>::from_iter_values(iter)
5295                .with_precision_and_scale(p, 2)
5296                .unwrap()
5297        };
5298
5299        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5300        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5301        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5302        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5303
5304        let batch = RecordBatch::try_from_iter([
5305            ("d1", Arc::new(d1) as ArrayRef),
5306            ("d2", Arc::new(d2) as ArrayRef),
5307            ("d3", Arc::new(d3) as ArrayRef),
5308            ("d4", Arc::new(d4) as ArrayRef),
5309        ])
5310        .unwrap();
5311
5312        let mut buffer = Vec::with_capacity(1024);
5313        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5314        writer.write(&batch).unwrap();
5315        writer.close().unwrap();
5316
5317        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5318        let t1 = builder.parquet_schema().columns()[0].physical_type();
5319        assert_eq!(t1, PhysicalType::INT32);
5320        let t2 = builder.parquet_schema().columns()[1].physical_type();
5321        assert_eq!(t2, PhysicalType::INT64);
5322        let t3 = builder.parquet_schema().columns()[2].physical_type();
5323        assert_eq!(t3, PhysicalType::INT64);
5324        let t4 = builder.parquet_schema().columns()[3].physical_type();
5325        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5326
5327        let mut reader = builder.build().unwrap();
5328        assert_eq!(batch.schema(), reader.schema());
5329
5330        let out = reader.next().unwrap().unwrap();
5331        assert_eq!(batch, out);
5332    }
5333
5334    #[test]
5335    fn test_decimal() {
5336        test_decimal32_roundtrip();
5337        test_decimal64_roundtrip();
5338        test_decimal_roundtrip::<Decimal128Type>();
5339        test_decimal_roundtrip::<Decimal256Type>();
5340    }
5341
5342    #[test]
5343    fn test_list_selection() {
5344        let schema = Arc::new(Schema::new(vec![Field::new_list(
5345            "list",
5346            Field::new_list_field(ArrowDataType::Utf8, true),
5347            false,
5348        )]));
5349        let mut buf = Vec::with_capacity(1024);
5350
5351        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5352
5353        for i in 0..2 {
5354            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5355            for j in 0..1024 {
5356                list_a_builder.values().append_value(format!("{i} {j}"));
5357                list_a_builder.append(true);
5358            }
5359            let batch =
5360                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5361                    .unwrap();
5362            writer.write(&batch).unwrap();
5363        }
5364        let _metadata = writer.close().unwrap();
5365
5366        let buf = Bytes::from(buf);
5367        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5368            .unwrap()
5369            .with_row_selection(RowSelection::from(vec![
5370                RowSelector::skip(100),
5371                RowSelector::select(924),
5372                RowSelector::skip(100),
5373                RowSelector::select(924),
5374            ]))
5375            .build()
5376            .unwrap();
5377
5378        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5379        let batch = concat_batches(&schema, &batches).unwrap();
5380
5381        assert_eq!(batch.num_rows(), 924 * 2);
5382        let list = batch.column(0).as_list::<i32>();
5383
5384        for w in list.value_offsets().windows(2) {
5385            assert_eq!(w[0] + 1, w[1])
5386        }
5387        let mut values = list.values().as_string::<i32>().iter();
5388
5389        for i in 0..2 {
5390            for j in 100..1024 {
5391                let expected = format!("{i} {j}");
5392                assert_eq!(values.next().unwrap().unwrap(), &expected);
5393            }
5394        }
5395    }
5396
5397    #[test]
5398    fn test_list_selection_fuzz() {
5399        let mut rng = rng();
5400        let schema = Arc::new(Schema::new(vec![Field::new_list(
5401            "list",
5402            Field::new_list(
5403                Field::LIST_FIELD_DEFAULT_NAME,
5404                Field::new_list_field(ArrowDataType::Int32, true),
5405                true,
5406            ),
5407            true,
5408        )]));
5409        let mut buf = Vec::with_capacity(1024);
5410        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5411
5412        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5413
5414        for _ in 0..2048 {
5415            if rng.random_bool(0.2) {
5416                list_a_builder.append(false);
5417                continue;
5418            }
5419
5420            let list_a_len = rng.random_range(0..10);
5421            let list_b_builder = list_a_builder.values();
5422
5423            for _ in 0..list_a_len {
5424                if rng.random_bool(0.2) {
5425                    list_b_builder.append(false);
5426                    continue;
5427                }
5428
5429                let list_b_len = rng.random_range(0..10);
5430                let int_builder = list_b_builder.values();
5431                for _ in 0..list_b_len {
5432                    match rng.random_bool(0.2) {
5433                        true => int_builder.append_null(),
5434                        false => int_builder.append_value(rng.random()),
5435                    }
5436                }
5437                list_b_builder.append(true)
5438            }
5439            list_a_builder.append(true);
5440        }
5441
5442        let array = Arc::new(list_a_builder.finish());
5443        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5444
5445        writer.write(&batch).unwrap();
5446        let _metadata = writer.close().unwrap();
5447
5448        let buf = Bytes::from(buf);
5449
5450        let cases = [
5451            vec![
5452                RowSelector::skip(100),
5453                RowSelector::select(924),
5454                RowSelector::skip(100),
5455                RowSelector::select(924),
5456            ],
5457            vec![
5458                RowSelector::select(924),
5459                RowSelector::skip(100),
5460                RowSelector::select(924),
5461                RowSelector::skip(100),
5462            ],
5463            vec![
5464                RowSelector::skip(1023),
5465                RowSelector::select(1),
5466                RowSelector::skip(1023),
5467                RowSelector::select(1),
5468            ],
5469            vec![
5470                RowSelector::select(1),
5471                RowSelector::skip(1023),
5472                RowSelector::select(1),
5473                RowSelector::skip(1023),
5474            ],
5475        ];
5476
5477        for batch_size in [100, 1024, 2048] {
5478            for selection in &cases {
5479                let selection = RowSelection::from(selection.clone());
5480                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5481                    .unwrap()
5482                    .with_row_selection(selection.clone())
5483                    .with_batch_size(batch_size)
5484                    .build()
5485                    .unwrap();
5486
5487                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5488                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5489                assert_eq!(actual.num_rows(), selection.row_count());
5490
5491                let mut batch_offset = 0;
5492                let mut actual_offset = 0;
5493                for selector in selection.iter() {
5494                    if selector.skip {
5495                        batch_offset += selector.row_count;
5496                        continue;
5497                    }
5498
5499                    assert_eq!(
5500                        batch.slice(batch_offset, selector.row_count),
5501                        actual.slice(actual_offset, selector.row_count)
5502                    );
5503
5504                    batch_offset += selector.row_count;
5505                    actual_offset += selector.row_count;
5506                }
5507            }
5508        }
5509    }
5510
5511    #[test]
5512    fn test_read_old_nested_list() {
5513        use arrow::datatypes::DataType;
5514        use arrow::datatypes::ToByteSlice;
5515
5516        let testdata = arrow::util::test_util::parquet_test_data();
5517        // message my_record {
5518        //     REQUIRED group a (LIST) {
5519        //         REPEATED group array (LIST) {
5520        //             REPEATED INT32 array;
5521        //         }
5522        //     }
5523        // }
5524        // should be read as list<list<int32>>
5525        let path = format!("{testdata}/old_list_structure.parquet");
5526        let test_file = File::open(path).unwrap();
5527
5528        // create expected ListArray
5529        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5530
5531        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5532        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5533
5534        // Construct a list array from the above two
5535        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5536            "array",
5537            DataType::Int32,
5538            false,
5539        ))))
5540        .len(2)
5541        .add_buffer(a_value_offsets)
5542        .add_child_data(a_values.into_data())
5543        .build()
5544        .unwrap();
5545        let a = ListArray::from(a_list_data);
5546
5547        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5548        let mut reader = builder.build().unwrap();
5549        let out = reader.next().unwrap().unwrap();
5550        assert_eq!(out.num_rows(), 1);
5551        assert_eq!(out.num_columns(), 1);
5552        // grab first column
5553        let c0 = out.column(0);
5554        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5555        // get first row: [[1, 2], [3, 4]]
5556        let r0 = c0arr.value(0);
5557        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5558        assert_eq!(r0arr, &a);
5559    }
5560
5561    #[test]
5562    fn test_map_no_value() {
5563        // File schema:
5564        // message schema {
5565        //   required group my_map (MAP) {
5566        //     repeated group key_value {
5567        //       required int32 key;
5568        //       optional int32 value;
5569        //     }
5570        //   }
5571        //   required group my_map_no_v (MAP) {
5572        //     repeated group key_value {
5573        //       required int32 key;
5574        //     }
5575        //   }
5576        //   required group my_list (LIST) {
5577        //     repeated group list {
5578        //       required int32 element;
5579        //     }
5580        //   }
5581        // }
5582        let testdata = arrow::util::test_util::parquet_test_data();
5583        let path = format!("{testdata}/map_no_value.parquet");
5584        let file = File::open(path).unwrap();
5585
5586        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5587            .unwrap()
5588            .build()
5589            .unwrap();
5590        let out = reader.next().unwrap().unwrap();
5591        assert_eq!(out.num_rows(), 3);
5592        assert_eq!(out.num_columns(), 3);
5593        // my_map_no_v and my_list columns should now be equivalent
5594        let c0 = out.column(1).as_list::<i32>();
5595        let c1 = out.column(2).as_list::<i32>();
5596        assert_eq!(c0.len(), c1.len());
5597        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5598    }
5599
5600    #[test]
5601    fn test_row_filter_full_page_skip_is_handled() {
5602        let first_value: i64 = 1111;
5603        let last_value: i64 = 9999;
5604        let num_rows: usize = 12;
5605
5606        // build data with row selection average length 4
5607        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
5608        // The Row Selection would be [1111, (skip 10), 9999]
5609        let schema = Arc::new(Schema::new(vec![
5610            Field::new("key", arrow_schema::DataType::Int64, false),
5611            Field::new("value", arrow_schema::DataType::Int64, false),
5612        ]));
5613
5614        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5615        int_values[0] = first_value;
5616        int_values[num_rows - 1] = last_value;
5617        let keys = Int64Array::from(int_values.clone());
5618        let values = Int64Array::from(int_values.clone());
5619        let batch = RecordBatch::try_new(
5620            Arc::clone(&schema),
5621            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5622        )
5623        .unwrap();
5624
5625        let props = WriterProperties::builder()
5626            .set_write_batch_size(2)
5627            .set_data_page_row_count_limit(2)
5628            .build();
5629
5630        let mut buffer = Vec::new();
5631        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5632        writer.write(&batch).unwrap();
5633        writer.close().unwrap();
5634        let data = Bytes::from(buffer);
5635
5636        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5637        let builder =
5638            ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5639        let schema = builder.parquet_schema().clone();
5640        let filter_mask = ProjectionMask::leaves(&schema, [0]);
5641
5642        let make_predicate = |mask: ProjectionMask| {
5643            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5644                let column = batch.column(0);
5645                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5646                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5647                or(&match_first, &match_second)
5648            })
5649        };
5650
5651        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5652        let predicate = make_predicate(filter_mask.clone());
5653
5654        // The batch size is set to 12 to read all rows in one go after filtering
5655        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
5656        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5657            .unwrap()
5658            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5659            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5660            .with_batch_size(12)
5661            .build()
5662            .unwrap();
5663
5664        // Predicate pruning used to panic once mask-backed plans removed whole pages.
5665        // Collecting into batches validates the plan now downgrades to selectors instead.
5666        let schema = reader.schema().clone();
5667        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5668        let result = concat_batches(&schema, &batches).unwrap();
5669        assert_eq!(result.num_rows(), 2);
5670    }
5671
5672    #[test]
5673    fn test_get_row_group_column_bloom_filter_with_length() {
5674        // convert to new parquet file with bloom_filter_length
5675        let testdata = arrow::util::test_util::parquet_test_data();
5676        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5677        let file = File::open(path).unwrap();
5678        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5679        let schema = builder.schema().clone();
5680        let reader = builder.build().unwrap();
5681
5682        let mut parquet_data = Vec::new();
5683        let props = WriterProperties::builder()
5684            .set_bloom_filter_enabled(true)
5685            .build();
5686        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5687        for batch in reader {
5688            let batch = batch.unwrap();
5689            writer.write(&batch).unwrap();
5690        }
5691        writer.close().unwrap();
5692
5693        // test the new parquet file
5694        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5695    }
5696
5697    #[test]
5698    fn test_get_row_group_column_bloom_filter_without_length() {
5699        let testdata = arrow::util::test_util::parquet_test_data();
5700        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5701        let data = Bytes::from(std::fs::read(path).unwrap());
5702        test_get_row_group_column_bloom_filter(data, false);
5703    }
5704
5705    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5706        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5707
5708        let metadata = builder.metadata();
5709        assert_eq!(metadata.num_row_groups(), 1);
5710        let row_group = metadata.row_group(0);
5711        let column = row_group.column(0);
5712        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5713
5714        let sbbf = builder
5715            .get_row_group_column_bloom_filter(0, 0)
5716            .unwrap()
5717            .unwrap();
5718        assert!(sbbf.check(&"Hello"));
5719        assert!(!sbbf.check(&"Hello_Not_Exists"));
5720    }
5721
5722    #[test]
5723    fn test_read_unknown_logical_type() {
5724        let testdata = arrow::util::test_util::parquet_test_data();
5725        let path = format!("{testdata}/unknown-logical-type.parquet");
5726        let test_file = File::open(path).unwrap();
5727
5728        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5729            .expect("Error creating reader builder");
5730
5731        let schema = builder.metadata().file_metadata().schema_descr();
5732        assert_eq!(
5733            schema.column(0).logical_type_ref(),
5734            Some(&LogicalType::String)
5735        );
5736        assert_eq!(
5737            schema.column(1).logical_type_ref(),
5738            Some(&LogicalType::_Unknown { field_id: 2555 })
5739        );
5740        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5741
5742        let mut reader = builder.build().unwrap();
5743        let out = reader.next().unwrap().unwrap();
5744        assert_eq!(out.num_rows(), 3);
5745        assert_eq!(out.num_columns(), 2);
5746    }
5747
5748    #[test]
5749    fn test_read_row_numbers() {
5750        let file = write_parquet_from_iter(vec![(
5751            "value",
5752            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5753        )]);
5754        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5755
5756        let row_number_field = Arc::new(
5757            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5758        );
5759
5760        let options = ArrowReaderOptions::new()
5761            .with_schema(Arc::new(Schema::new(supplied_fields)))
5762            .with_virtual_columns(vec![row_number_field.clone()])
5763            .unwrap();
5764        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5765            file.try_clone().unwrap(),
5766            options,
5767        )
5768        .expect("reader builder with schema")
5769        .build()
5770        .expect("reader with schema");
5771
5772        let batch = arrow_reader.next().unwrap().unwrap();
5773        let schema = Arc::new(Schema::new(vec![
5774            Field::new("value", ArrowDataType::Int64, false),
5775            (*row_number_field).clone(),
5776        ]));
5777
5778        assert_eq!(batch.schema(), schema);
5779        assert_eq!(batch.num_columns(), 2);
5780        assert_eq!(batch.num_rows(), 3);
5781        assert_eq!(
5782            batch
5783                .column(0)
5784                .as_primitive::<types::Int64Type>()
5785                .iter()
5786                .collect::<Vec<_>>(),
5787            vec![Some(1), Some(2), Some(3)]
5788        );
5789        assert_eq!(
5790            batch
5791                .column(1)
5792                .as_primitive::<types::Int64Type>()
5793                .iter()
5794                .collect::<Vec<_>>(),
5795            vec![Some(0), Some(1), Some(2)]
5796        );
5797    }
5798
5799    #[test]
5800    fn test_read_only_row_numbers() {
5801        let file = write_parquet_from_iter(vec![(
5802            "value",
5803            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5804        )]);
5805        let row_number_field = Arc::new(
5806            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5807        );
5808        let options = ArrowReaderOptions::new()
5809            .with_virtual_columns(vec![row_number_field.clone()])
5810            .unwrap();
5811        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5812        let num_columns = metadata
5813            .metadata
5814            .file_metadata()
5815            .schema_descr()
5816            .num_columns();
5817
5818        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5819            .with_projection(ProjectionMask::none(num_columns))
5820            .build()
5821            .expect("reader with schema");
5822
5823        let batch = arrow_reader.next().unwrap().unwrap();
5824        let schema = Arc::new(Schema::new(vec![row_number_field]));
5825
5826        assert_eq!(batch.schema(), schema);
5827        assert_eq!(batch.num_columns(), 1);
5828        assert_eq!(batch.num_rows(), 3);
5829        assert_eq!(
5830            batch
5831                .column(0)
5832                .as_primitive::<types::Int64Type>()
5833                .iter()
5834                .collect::<Vec<_>>(),
5835            vec![Some(0), Some(1), Some(2)]
5836        );
5837    }
5838
5839    #[test]
5840    fn test_read_row_numbers_row_group_order() -> Result<()> {
5841        // Make a parquet file with 100 rows split across 2 row groups
5842        let array = Int64Array::from_iter_values(5000..5100);
5843        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5844        let mut buffer = Vec::new();
5845        let options = WriterProperties::builder()
5846            .set_max_row_group_size(50)
5847            .build();
5848        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5849        // write in 10 row batches as the size limits are enforced after each batch
5850        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5851            writer.write(&batch_chunk)?;
5852        }
5853        writer.close()?;
5854
5855        let row_number_field = Arc::new(
5856            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5857        );
5858
5859        let buffer = Bytes::from(buffer);
5860
5861        let options =
5862            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5863
5864        // read out with normal options
5865        let arrow_reader =
5866            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5867                .build()?;
5868
5869        assert_eq!(
5870            ValuesAndRowNumbers {
5871                values: (5000..5100).collect(),
5872                row_numbers: (0..100).collect()
5873            },
5874            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5875        );
5876
5877        // Now read, out of order row groups
5878        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5879            .with_row_groups(vec![1, 0])
5880            .build()?;
5881
5882        assert_eq!(
5883            ValuesAndRowNumbers {
5884                values: (5050..5100).chain(5000..5050).collect(),
5885                row_numbers: (50..100).chain(0..50).collect(),
5886            },
5887            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5888        );
5889
5890        Ok(())
5891    }
5892
5893    #[derive(Debug, PartialEq)]
5894    struct ValuesAndRowNumbers {
5895        values: Vec<i64>,
5896        row_numbers: Vec<i64>,
5897    }
5898    impl ValuesAndRowNumbers {
5899        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5900            let mut values = vec![];
5901            let mut row_numbers = vec![];
5902            for batch in reader {
5903                let batch = batch.expect("Could not read batch");
5904                values.extend(
5905                    batch
5906                        .column_by_name("col")
5907                        .expect("Could not get col column")
5908                        .as_primitive::<arrow::datatypes::Int64Type>()
5909                        .iter()
5910                        .map(|v| v.expect("Could not get value")),
5911                );
5912
5913                row_numbers.extend(
5914                    batch
5915                        .column_by_name("row_number")
5916                        .expect("Could not get row_number column")
5917                        .as_primitive::<arrow::datatypes::Int64Type>()
5918                        .iter()
5919                        .map(|v| v.expect("Could not get row number"))
5920                        .collect::<Vec<_>>(),
5921                );
5922            }
5923            Self {
5924                values,
5925                row_numbers,
5926            }
5927        }
5928    }
5929
5930    #[test]
5931    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5932        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5933        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5934        assert_eq!(
5935            ArrowReaderOptions::new()
5936                .with_virtual_columns(vec![regular_field])
5937                .unwrap_err()
5938                .to_string(),
5939            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5940        );
5941    }
5942
5943    #[test]
5944    fn test_row_numbers_with_multiple_row_groups() {
5945        test_row_numbers_with_multiple_row_groups_helper(
5946            false,
5947            |path, selection, _row_filter, batch_size| {
5948                let file = File::open(path).unwrap();
5949                let row_number_field = Arc::new(
5950                    Field::new("row_number", ArrowDataType::Int64, false)
5951                        .with_extension_type(RowNumber),
5952                );
5953                let options = ArrowReaderOptions::new()
5954                    .with_virtual_columns(vec![row_number_field])
5955                    .unwrap();
5956                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5957                    .unwrap()
5958                    .with_row_selection(selection)
5959                    .with_batch_size(batch_size)
5960                    .build()
5961                    .expect("Could not create reader");
5962                reader
5963                    .collect::<Result<Vec<_>, _>>()
5964                    .expect("Could not read")
5965            },
5966        );
5967    }
5968
5969    #[test]
5970    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5971        test_row_numbers_with_multiple_row_groups_helper(
5972            true,
5973            |path, selection, row_filter, batch_size| {
5974                let file = File::open(path).unwrap();
5975                let row_number_field = Arc::new(
5976                    Field::new("row_number", ArrowDataType::Int64, false)
5977                        .with_extension_type(RowNumber),
5978                );
5979                let options = ArrowReaderOptions::new()
5980                    .with_virtual_columns(vec![row_number_field])
5981                    .unwrap();
5982                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5983                    .unwrap()
5984                    .with_row_selection(selection)
5985                    .with_batch_size(batch_size)
5986                    .with_row_filter(row_filter.expect("No filter"))
5987                    .build()
5988                    .expect("Could not create reader");
5989                reader
5990                    .collect::<Result<Vec<_>, _>>()
5991                    .expect("Could not read")
5992            },
5993        );
5994    }
5995
5996    #[test]
5997    fn test_read_row_group_indices() {
5998        // create a parquet file with 3 row groups, 2 rows each
5999        let array1 = Int64Array::from(vec![1, 2]);
6000        let array2 = Int64Array::from(vec![3, 4]);
6001        let array3 = Int64Array::from(vec![5, 6]);
6002
6003        let batch1 =
6004            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6005        let batch2 =
6006            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6007        let batch3 =
6008            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
6009
6010        let mut buffer = Vec::new();
6011        let options = WriterProperties::builder()
6012            .set_max_row_group_size(2)
6013            .build();
6014        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6015        writer.write(&batch1).unwrap();
6016        writer.write(&batch2).unwrap();
6017        writer.write(&batch3).unwrap();
6018        writer.close().unwrap();
6019
6020        let file = Bytes::from(buffer);
6021        let row_group_index_field = Arc::new(
6022            Field::new("row_group_index", ArrowDataType::Int64, false)
6023                .with_extension_type(RowGroupIndex),
6024        );
6025
6026        let options = ArrowReaderOptions::new()
6027            .with_virtual_columns(vec![row_group_index_field.clone()])
6028            .unwrap();
6029        let mut arrow_reader =
6030            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
6031                .expect("reader builder with virtual columns")
6032                .build()
6033                .expect("reader with virtual columns");
6034
6035        let batch = arrow_reader.next().unwrap().unwrap();
6036
6037        assert_eq!(batch.num_columns(), 2);
6038        assert_eq!(batch.num_rows(), 6);
6039
6040        assert_eq!(
6041            batch
6042                .column(0)
6043                .as_primitive::<types::Int64Type>()
6044                .iter()
6045                .collect::<Vec<_>>(),
6046            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
6047        );
6048
6049        assert_eq!(
6050            batch
6051                .column(1)
6052                .as_primitive::<types::Int64Type>()
6053                .iter()
6054                .collect::<Vec<_>>(),
6055            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
6056        );
6057    }
6058
6059    #[test]
6060    fn test_read_only_row_group_indices() {
6061        let array1 = Int64Array::from(vec![1, 2, 3]);
6062        let array2 = Int64Array::from(vec![4, 5]);
6063
6064        let batch1 =
6065            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6066        let batch2 =
6067            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6068
6069        let mut buffer = Vec::new();
6070        let options = WriterProperties::builder()
6071            .set_max_row_group_size(3)
6072            .build();
6073        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6074        writer.write(&batch1).unwrap();
6075        writer.write(&batch2).unwrap();
6076        writer.close().unwrap();
6077
6078        let file = Bytes::from(buffer);
6079        let row_group_index_field = Arc::new(
6080            Field::new("row_group_index", ArrowDataType::Int64, false)
6081                .with_extension_type(RowGroupIndex),
6082        );
6083
6084        let options = ArrowReaderOptions::new()
6085            .with_virtual_columns(vec![row_group_index_field.clone()])
6086            .unwrap();
6087        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
6088        let num_columns = metadata
6089            .metadata
6090            .file_metadata()
6091            .schema_descr()
6092            .num_columns();
6093
6094        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
6095            .with_projection(ProjectionMask::none(num_columns))
6096            .build()
6097            .expect("reader with virtual columns only");
6098
6099        let batch = arrow_reader.next().unwrap().unwrap();
6100        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
6101
6102        assert_eq!(batch.schema(), schema);
6103        assert_eq!(batch.num_columns(), 1);
6104        assert_eq!(batch.num_rows(), 5);
6105
6106        assert_eq!(
6107            batch
6108                .column(0)
6109                .as_primitive::<types::Int64Type>()
6110                .iter()
6111                .collect::<Vec<_>>(),
6112            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
6113        );
6114    }
6115
6116    #[test]
6117    fn test_read_row_group_indices_with_selection() -> Result<()> {
6118        let mut buffer = Vec::new();
6119        let options = WriterProperties::builder()
6120            .set_max_row_group_size(10)
6121            .build();
6122
6123        let schema = Arc::new(Schema::new(vec![Field::new(
6124            "value",
6125            ArrowDataType::Int64,
6126            false,
6127        )]));
6128
6129        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
6130
6131        // write out 3 batches of 10 rows each
6132        for i in 0..3 {
6133            let start = i * 10;
6134            let array = Int64Array::from_iter_values(start..start + 10);
6135            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
6136            writer.write(&batch)?;
6137        }
6138        writer.close()?;
6139
6140        let file = Bytes::from(buffer);
6141        let row_group_index_field = Arc::new(
6142            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
6143        );
6144
6145        let options =
6146            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
6147
6148        // test row groups are read in reverse order
6149        let arrow_reader =
6150            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
6151                .with_row_groups(vec![2, 1, 0])
6152                .build()?;
6153
6154        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
6155        let combined = concat_batches(&batches[0].schema(), &batches)?;
6156
6157        let values = combined.column(0).as_primitive::<types::Int64Type>();
6158        let first_val = values.value(0);
6159        let last_val = values.value(combined.num_rows() - 1);
6160        // first row from rg 2
6161        assert_eq!(first_val, 20);
6162        // the last row from rg 0
6163        assert_eq!(last_val, 9);
6164
6165        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
6166        assert_eq!(rg_indices.value(0), 2);
6167        assert_eq!(rg_indices.value(10), 1);
6168        assert_eq!(rg_indices.value(20), 0);
6169
6170        Ok(())
6171    }
6172
6173    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
6174        use_filter: bool,
6175        test_case: F,
6176    ) where
6177        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
6178    {
6179        let seed: u64 = random();
6180        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
6181        let mut rng = StdRng::seed_from_u64(seed);
6182
6183        use tempfile::TempDir;
6184        let tempdir = TempDir::new().expect("Could not create temp dir");
6185
6186        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
6187
6188        let path = tempdir.path().join("test.parquet");
6189        std::fs::write(&path, bytes).expect("Could not write file");
6190
6191        let mut case = vec![];
6192        let mut remaining = metadata.file_metadata().num_rows();
6193        while remaining > 0 {
6194            let row_count = rng.random_range(1..=remaining);
6195            remaining -= row_count;
6196            case.push(RowSelector {
6197                row_count: row_count as usize,
6198                skip: rng.random_bool(0.5),
6199            });
6200        }
6201
6202        let filter = use_filter.then(|| {
6203            let filter = (0..metadata.file_metadata().num_rows())
6204                .map(|_| rng.random_bool(0.99))
6205                .collect::<Vec<_>>();
6206            let mut filter_offset = 0;
6207            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
6208                ProjectionMask::all(),
6209                move |b| {
6210                    let array = BooleanArray::from_iter(
6211                        filter
6212                            .iter()
6213                            .skip(filter_offset)
6214                            .take(b.num_rows())
6215                            .map(|x| Some(*x)),
6216                    );
6217                    filter_offset += b.num_rows();
6218                    Ok(array)
6219                },
6220            ))])
6221        });
6222
6223        let selection = RowSelection::from(case);
6224        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6225
6226        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6227            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6228            return;
6229        }
6230        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6231            .expect("Failed to concatenate");
6232        // assert_eq!(selection.row_count(), actual.num_rows());
6233        let values = actual
6234            .column(0)
6235            .as_primitive::<types::Int64Type>()
6236            .iter()
6237            .collect::<Vec<_>>();
6238        let row_numbers = actual
6239            .column(1)
6240            .as_primitive::<types::Int64Type>()
6241            .iter()
6242            .collect::<Vec<_>>();
6243        assert_eq!(
6244            row_numbers
6245                .into_iter()
6246                .map(|number| number.map(|number| number + 1))
6247                .collect::<Vec<_>>(),
6248            values
6249        );
6250    }
6251
6252    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6253        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6254            "value",
6255            ArrowDataType::Int64,
6256            false,
6257        )])));
6258
6259        let mut buf = Vec::with_capacity(1024);
6260        let mut writer =
6261            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6262
6263        let mut values = 1..=rng.random_range(1..4096);
6264        while !values.is_empty() {
6265            let batch_values = values
6266                .by_ref()
6267                .take(rng.random_range(1..4096))
6268                .collect::<Vec<_>>();
6269            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6270            let batch =
6271                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6272            writer.write(&batch).expect("Could not write batch");
6273            writer.flush().expect("Could not flush");
6274        }
6275        let metadata = writer.close().expect("Could not close writer");
6276
6277        (Bytes::from(buf), metadata)
6278    }
6279}