Skip to main content

parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    ///
308    /// See the [blog post on late materialization] for a more technical explanation.
309    ///
310    /// [blog post on late materialization]: https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive
311    ///
312    /// # Example
313    /// ```rust
314    /// # use std::fs::File;
315    /// # use arrow_array::Int32Array;
316    /// # use parquet::arrow::ProjectionMask;
317    /// # use parquet::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter};
318    /// # fn main() -> Result<(), parquet::errors::ParquetError> {
319    /// # let testdata = arrow::util::test_util::parquet_test_data();
320    /// # let path = format!("{testdata}/alltypes_plain.parquet");
321    /// # let file = File::open(&path)?;
322    /// let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
323    /// let schema_desc = builder.metadata().file_metadata().schema_descr_ptr();
324    /// // Create predicate that evaluates `int_col != 1`.
325    /// // `int_col` column has index 4 (zero based) in the schema
326    /// let projection = ProjectionMask::leaves(&schema_desc, [4]);
327    /// // Only the projection columns are passed to the predicate so
328    /// // int_col is column 0 in the predicate
329    /// let predicate = ArrowPredicateFn::new(projection, |batch| {
330    ///     let int_col = batch.column(0);
331    ///     arrow::compute::kernels::cmp::neq(int_col, &Int32Array::new_scalar(1))
332    /// });
333    /// let row_filter = RowFilter::new(vec![Box::new(predicate)]);
334    /// // The filter will be invoked during the reading process
335    /// let reader = builder.with_row_filter(row_filter).build()?;
336    /// # for b in reader { let _ = b?; }
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn with_row_filter(self, filter: RowFilter) -> Self {
341        Self {
342            filter: Some(filter),
343            ..self
344        }
345    }
346
347    /// Provide a limit to the number of rows to be read
348    ///
349    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
350    /// allowing it to limit the final set of rows decoded after any pushed down predicates
351    ///
352    /// It is recommended to enable reading the page index if using this functionality, to allow
353    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
354    pub fn with_limit(self, limit: usize) -> Self {
355        Self {
356            limit: Some(limit),
357            ..self
358        }
359    }
360
361    /// Provide an offset to skip over the given number of rows
362    ///
363    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
364    /// allowing it to skip rows after any pushed down predicates
365    ///
366    /// It is recommended to enable reading the page index if using this functionality, to allow
367    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
368    pub fn with_offset(self, offset: usize) -> Self {
369        Self {
370            offset: Some(offset),
371            ..self
372        }
373    }
374
375    /// Specify metrics collection during reading
376    ///
377    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
378    /// clone of the provided metrics to the builder.
379    ///
380    /// For example:
381    ///
382    /// ```rust
383    /// # use std::sync::Arc;
384    /// # use bytes::Bytes;
385    /// # use arrow_array::{Int32Array, RecordBatch};
386    /// # use arrow_schema::{DataType, Field, Schema};
387    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
388    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
389    /// # use parquet::arrow::ArrowWriter;
390    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
391    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
392    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
393    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
394    /// # writer.write(&batch).unwrap();
395    /// # writer.close().unwrap();
396    /// # let file = Bytes::from(file);
397    /// // Create metrics object to pass into the reader
398    /// let metrics = ArrowReaderMetrics::enabled();
399    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
400    ///   // Configure the builder to use the metrics by passing a clone
401    ///   .with_metrics(metrics.clone())
402    ///   // Build the reader
403    ///   .build().unwrap();
404    /// // .. read data from the reader ..
405    ///
406    /// // check the metrics
407    /// assert!(metrics.records_read_from_inner().is_some());
408    /// ```
409    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410        Self { metrics, ..self }
411    }
412
413    /// Set the maximum size (per row group) of the predicate cache in bytes for
414    /// the async decoder.
415    ///
416    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
417    /// unlimited cache size.
418    ///
419    /// This cache is used to store decoded arrays that are used in
420    /// predicate evaluation ([`Self::with_row_filter`]).
421    ///
422    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
423    /// [this ticket] for more details and alternatives.
424    ///
425    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
426    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
427    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428        Self {
429            max_predicate_cache_size,
430            ..self
431        }
432    }
433}
434
435/// Options that control how [`ParquetMetaData`] is read when constructing
436/// an Arrow reader.
437///
438/// To use these options, pass them to one of the following methods:
439/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
440/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
441///
442/// For fine-grained control over metadata loading, use
443/// [`ArrowReaderMetadata::load`] to load metadata with these options,
444///
445/// See [`ArrowReaderBuilder`] for how to configure how the column data
446/// is then read from the file, including projection and filter pushdown
447///
448/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
449#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451    /// Should the reader strip any user defined metadata from the Arrow schema
452    skip_arrow_metadata: bool,
453    /// If provided, used as the schema hint when determining the Arrow schema,
454    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
455    ///
456    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
457    supplied_schema: Option<SchemaRef>,
458
459    pub(crate) column_index: PageIndexPolicy,
460    pub(crate) offset_index: PageIndexPolicy,
461
462    /// Options to control reading of Parquet metadata
463    metadata_options: ParquetMetaDataOptions,
464    /// If encryption is enabled, the file decryption properties can be provided
465    #[cfg(feature = "encryption")]
466    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468    virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472    /// Create a new [`ArrowReaderOptions`] with the default settings
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Skip decoding the embedded arrow metadata (defaults to `false`)
478    ///
479    /// Parquet files generated by some writers may contain embedded arrow
480    /// schema and metadata.
481    /// This may not be correct or compatible with your system,
482    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
483    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484        Self {
485            skip_arrow_metadata,
486            ..self
487        }
488    }
489
490    /// Provide a schema hint to use when reading the Parquet file.
491    ///
492    /// If provided, this schema takes precedence over any arrow schema embedded
493    /// in the metadata (see the [`arrow`] documentation for more details).
494    ///
495    /// If the provided schema is not compatible with the data stored in the
496    /// parquet file schema, an error will be returned when constructing the
497    /// builder.
498    ///
499    /// This option is only required if you want to explicitly control the
500    /// conversion of Parquet types to Arrow types, such as casting a column to
501    /// a different type. For example, if you wanted to read an Int64 in
502    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
503    ///
504    /// [`arrow`]: crate::arrow
505    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
506    ///
507    /// # Notes
508    ///
509    /// The provided schema must have the same number of columns as the parquet schema and
510    /// the column names must be the same.
511    ///
512    /// # Example
513    /// ```
514    /// # use std::sync::Arc;
515    /// # use bytes::Bytes;
516    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
517    /// # use arrow_schema::{DataType, Field, Schema, TimeUnit};
518    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
519    /// # use parquet::arrow::ArrowWriter;
520    /// // Write data - schema is inferred from the data to be Int32
521    /// let mut file = Vec::new();
522    /// let batch = RecordBatch::try_from_iter(vec![
523    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
524    /// ]).unwrap();
525    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
526    /// writer.write(&batch).unwrap();
527    /// writer.close().unwrap();
528    /// let file = Bytes::from(file);
529    ///
530    /// // Read the file back.
531    /// // Supply a schema that interprets the Int32 column as a Timestamp.
532    /// let supplied_schema = Arc::new(Schema::new(vec![
533    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
534    /// ]));
535    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
536    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
537    ///     file.clone(),
538    ///     options
539    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
540    ///
541    /// // Create the reader and read the data using the supplied schema.
542    /// let mut reader = builder.build().unwrap();
543    /// let _batch = reader.next().unwrap().unwrap();
544    /// ```
545    ///
546    /// # Example: Preserving Dictionary Encoding
547    ///
548    /// By default, Parquet string columns are read as `Utf8Array` (or `LargeUtf8Array`),
549    /// even if the underlying Parquet data uses dictionary encoding. You can preserve
550    /// the dictionary encoding by specifying a `Dictionary` type in the schema hint:
551    ///
552    /// ```
553    /// # use std::sync::Arc;
554    /// # use bytes::Bytes;
555    /// # use arrow_array::{ArrayRef, RecordBatch, StringArray};
556    /// # use arrow_schema::{DataType, Field, Schema};
557    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
558    /// # use parquet::arrow::ArrowWriter;
559    /// // Write a Parquet file with string data
560    /// let mut file = Vec::new();
561    /// let schema = Arc::new(Schema::new(vec![
562    ///     Field::new("city", DataType::Utf8, false)
563    /// ]));
564    /// let cities = StringArray::from(vec!["Berlin", "Berlin", "Paris", "Berlin", "Paris"]);
565    /// let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(cities)]).unwrap();
566    ///
567    /// let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None).unwrap();
568    /// writer.write(&batch).unwrap();
569    /// writer.close().unwrap();
570    /// let file = Bytes::from(file);
571    ///
572    /// // Read the file back, requesting dictionary encoding preservation
573    /// let dict_schema = Arc::new(Schema::new(vec![
574    ///     Field::new("city", DataType::Dictionary(
575    ///         Box::new(DataType::Int32),
576    ///         Box::new(DataType::Utf8)
577    ///     ), false)
578    /// ]));
579    /// let options = ArrowReaderOptions::new().with_schema(dict_schema);
580    /// let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
581    ///     file.clone(),
582    ///     options
583    /// ).unwrap();
584    ///
585    /// let mut reader = builder.build().unwrap();
586    /// let batch = reader.next().unwrap().unwrap();
587    ///
588    /// // The column is now a DictionaryArray
589    /// assert!(matches!(
590    ///     batch.column(0).data_type(),
591    ///     DataType::Dictionary(_, _)
592    /// ));
593    /// ```
594    ///
595    /// **Note**: Dictionary encoding preservation works best when:
596    /// 1. The original column was dictionary encoded (the default for string columns)
597    /// 2. There are a small number of distinct values
598    pub fn with_schema(self, schema: SchemaRef) -> Self {
599        Self {
600            supplied_schema: Some(schema),
601            skip_arrow_metadata: true,
602            ..self
603        }
604    }
605
606    #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
608    ///
609    /// The `PageIndex` can be used to push down predicates to the parquet scan,
610    /// potentially eliminating unnecessary IO, by some query engines.
611    ///
612    /// If this is enabled, [`ParquetMetaData::column_index`] and
613    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
614    /// information is present in the file.
615    ///
616    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
617    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
618    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
619    pub fn with_page_index(self, page_index: bool) -> Self {
620        self.with_page_index_policy(PageIndexPolicy::from(page_index))
621    }
622
623    /// Sets the [`PageIndexPolicy`] for both the column and offset indexes.
624    ///
625    /// The `PageIndex` consists of two structures: the `ColumnIndex` and `OffsetIndex`.
626    /// This method sets the same policy for both. For fine-grained control, use
627    /// [`Self::with_column_index_policy`] and [`Self::with_offset_index_policy`].
628    ///
629    /// See [`Self::with_page_index`] for more details on page indexes.
630    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631        self.with_column_index_policy(policy)
632            .with_offset_index_policy(policy)
633    }
634
635    /// Sets the [`PageIndexPolicy`] for the Parquet [ColumnIndex] structure.
636    ///
637    /// The `ColumnIndex` contains min/max statistics for each page, which can be used
638    /// for predicate pushdown and page-level pruning.
639    ///
640    /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
641    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642        self.column_index = policy;
643        self
644    }
645
646    /// Sets the [`PageIndexPolicy`] for the Parquet [OffsetIndex] structure.
647    ///
648    /// The `OffsetIndex` contains the locations and sizes of each page, which enables
649    /// efficient page-level skipping and random access within column chunks.
650    ///
651    /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
652    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653        self.offset_index = policy;
654        self
655    }
656
657    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
658    /// footer will be skipped.
659    ///
660    /// This can be used to avoid reparsing the schema from the file when it is
661    /// already known.
662    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663        self.metadata_options.set_schema(schema);
664        self
665    }
666
667    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
668    /// (defaults to `false`).
669    ///
670    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
671    /// might be desirable.
672    ///
673    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
674    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
675    /// [`encoding_stats`]:
676    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
677    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678        self.metadata_options.set_encoding_stats_as_mask(val);
679        self
680    }
681
682    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
683    ///
684    /// [`encoding_stats`]:
685    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
686    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687        self.metadata_options.set_encoding_stats_policy(policy);
688        self
689    }
690
691    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
692    ///
693    /// [`statistics`]:
694    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
695    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696        self.metadata_options.set_column_stats_policy(policy);
697        self
698    }
699
700    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
701    ///
702    /// [`size_statistics`]:
703    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
704    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705        self.metadata_options.set_size_stats_policy(policy);
706        self
707    }
708
709    /// Provide the file decryption properties to use when reading encrypted parquet files.
710    ///
711    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
712    #[cfg(feature = "encryption")]
713    pub fn with_file_decryption_properties(
714        self,
715        file_decryption_properties: Arc<FileDecryptionProperties>,
716    ) -> Self {
717        Self {
718            file_decryption_properties: Some(file_decryption_properties),
719            ..self
720        }
721    }
722
723    /// Include virtual columns in the output.
724    ///
725    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices.
726    ///
727    /// # Example
728    /// ```
729    /// # use std::sync::Arc;
730    /// # use bytes::Bytes;
731    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
732    /// # use arrow_schema::{DataType, Field, Schema};
733    /// # use parquet::arrow::{ArrowWriter, RowNumber};
734    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
735    /// #
736    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
737    /// // Create a simple record batch with some data
738    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
739    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
740    ///
741    /// // Write the batch to an in-memory buffer
742    /// let mut file = Vec::new();
743    /// let mut writer = ArrowWriter::try_new(
744    ///     &mut file,
745    ///     batch.schema(),
746    ///     None
747    /// )?;
748    /// writer.write(&batch)?;
749    /// writer.close()?;
750    /// let file = Bytes::from(file);
751    ///
752    /// // Create a virtual column for row numbers
753    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
754    ///     .with_extension_type(RowNumber));
755    ///
756    /// // Configure options with virtual columns
757    /// let options = ArrowReaderOptions::new()
758    ///     .with_virtual_columns(vec![row_number_field])?;
759    ///
760    /// // Create a reader with the options
761    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
762    ///     file,
763    ///     options
764    /// )?
765    /// .build()?;
766    ///
767    /// // Read the batch - it will include both the original column and the virtual row_number column
768    /// let result_batch = reader.next().unwrap()?;
769    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
770    /// assert_eq!(result_batch.num_rows(), 3);
771    /// #
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776        // Validate that all fields are virtual columns
777        for field in &virtual_columns {
778            if !is_virtual_column(field) {
779                return Err(ParquetError::General(format!(
780                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781                    field.name()
782                )));
783            }
784        }
785        Ok(Self {
786            virtual_columns,
787            ..self
788        })
789    }
790
791    #[deprecated(
792        since = "57.2.0",
793        note = "Use `column_index_policy` or `offset_index_policy` instead"
794    )]
795    /// Returns whether page index reading is enabled.
796    ///
797    /// This returns `true` if both the column index and offset index policies are not [`PageIndexPolicy::Skip`].
798    ///
799    /// This can be set via [`with_page_index`][Self::with_page_index] or
800    /// [`with_page_index_policy`][Self::with_page_index_policy].
801    pub fn page_index(&self) -> bool {
802        self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803    }
804
805    /// Retrieve the currently set [`PageIndexPolicy`] for the offset index.
806    ///
807    /// This can be set via [`with_offset_index_policy`][Self::with_offset_index_policy]
808    /// or [`with_page_index_policy`][Self::with_page_index_policy].
809    pub fn offset_index_policy(&self) -> PageIndexPolicy {
810        self.offset_index
811    }
812
813    /// Retrieve the currently set [`PageIndexPolicy`] for the column index.
814    ///
815    /// This can be set via [`with_column_index_policy`][Self::with_column_index_policy]
816    /// or [`with_page_index_policy`][Self::with_page_index_policy].
817    pub fn column_index_policy(&self) -> PageIndexPolicy {
818        self.column_index
819    }
820
821    /// Retrieve the currently set metadata decoding options.
822    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823        &self.metadata_options
824    }
825
826    /// Retrieve the currently set file decryption properties.
827    ///
828    /// This can be set via
829    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
830    #[cfg(feature = "encryption")]
831    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832        self.file_decryption_properties.as_ref()
833    }
834}
835
836/// The metadata necessary to construct a [`ArrowReaderBuilder`]
837///
838/// Note this structure is cheaply clone-able as it consists of several arcs.
839///
840/// This structure allows
841///
842/// 1. Loading metadata for a file once and then using that same metadata to
843///    construct multiple separate readers, for example, to distribute readers
844///    across multiple threads
845///
846/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
847///    from the file each time a reader is constructed.
848///
849/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
850#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852    /// The Parquet Metadata, if known aprior
853    pub(crate) metadata: Arc<ParquetMetaData>,
854    /// The Arrow Schema
855    pub(crate) schema: SchemaRef,
856    /// The Parquet schema (root field)
857    pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
862    /// and [`ChunkReader`]
863    ///
864    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
865    /// example of how this can be used
866    ///
867    /// # Notes
868    ///
869    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
870    /// `Self::metadata` is missing the page index, this function will attempt
871    /// to load the page index by making an object store request.
872    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873        let metadata = ParquetMetaDataReader::new()
874            .with_column_index_policy(options.column_index)
875            .with_offset_index_policy(options.offset_index)
876            .with_metadata_options(Some(options.metadata_options.clone()));
877        #[cfg(feature = "encryption")]
878        let metadata = metadata.with_decryption_properties(
879            options.file_decryption_properties.as_ref().map(Arc::clone),
880        );
881        let metadata = metadata.parse_and_finish(reader)?;
882        Self::try_new(Arc::new(metadata), options)
883    }
884
885    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
886    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
887    ///
888    /// # Notes
889    ///
890    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
891    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
892    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893        match options.supplied_schema {
894            Some(supplied_schema) => Self::with_supplied_schema(
895                metadata,
896                supplied_schema.clone(),
897                &options.virtual_columns,
898            ),
899            None => {
900                let kv_metadata = match options.skip_arrow_metadata {
901                    true => None,
902                    false => metadata.file_metadata().key_value_metadata(),
903                };
904
905                let (schema, fields) = parquet_to_arrow_schema_and_fields(
906                    metadata.file_metadata().schema_descr(),
907                    ProjectionMask::all(),
908                    kv_metadata,
909                    &options.virtual_columns,
910                )?;
911
912                Ok(Self {
913                    metadata,
914                    schema: Arc::new(schema),
915                    fields: fields.map(Arc::new),
916                })
917            }
918        }
919    }
920
921    fn with_supplied_schema(
922        metadata: Arc<ParquetMetaData>,
923        supplied_schema: SchemaRef,
924        virtual_columns: &[FieldRef],
925    ) -> Result<Self> {
926        let parquet_schema = metadata.file_metadata().schema_descr();
927        let field_levels = parquet_to_arrow_field_levels_with_virtual(
928            parquet_schema,
929            ProjectionMask::all(),
930            Some(supplied_schema.fields()),
931            virtual_columns,
932        )?;
933        let fields = field_levels.fields;
934        let inferred_len = fields.len();
935        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936        // Ensure the supplied schema has the same number of columns as the parquet schema.
937        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
938        // different lengths, but we check here to be safe.
939        if inferred_len != supplied_len {
940            return Err(arrow_err!(format!(
941                "Incompatible supplied Arrow schema: expected {} columns received {}",
942                inferred_len, supplied_len
943            )));
944        }
945
946        let mut errors = Vec::new();
947
948        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950        for (field1, field2) in field_iter {
951            if field1.data_type() != field2.data_type() {
952                errors.push(format!(
953                    "data type mismatch for field {}: requested {} but found {}",
954                    field1.name(),
955                    field1.data_type(),
956                    field2.data_type()
957                ));
958            }
959            if field1.is_nullable() != field2.is_nullable() {
960                errors.push(format!(
961                    "nullability mismatch for field {}: expected {:?} but found {:?}",
962                    field1.name(),
963                    field1.is_nullable(),
964                    field2.is_nullable()
965                ));
966            }
967            if field1.metadata() != field2.metadata() {
968                errors.push(format!(
969                    "metadata mismatch for field {}: expected {:?} but found {:?}",
970                    field1.name(),
971                    field1.metadata(),
972                    field2.metadata()
973                ));
974            }
975        }
976
977        if !errors.is_empty() {
978            let message = errors.join(", ");
979            return Err(ParquetError::ArrowError(format!(
980                "Incompatible supplied Arrow schema: {message}",
981            )));
982        }
983
984        Ok(Self {
985            metadata,
986            schema: supplied_schema,
987            fields: field_levels.levels.map(Arc::new),
988        })
989    }
990
991    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
992    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993        &self.metadata
994    }
995
996    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
997    pub fn parquet_schema(&self) -> &SchemaDescriptor {
998        self.metadata.file_metadata().schema_descr()
999    }
1000
1001    /// Returns the arrow [`SchemaRef`] for this parquet file
1002    pub fn schema(&self) -> &SchemaRef {
1003        &self.schema
1004    }
1005}
1006
1007#[doc(hidden)]
1008// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
1009pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013        f.debug_tuple("SyncReader").field(&self.0).finish()
1014    }
1015}
1016
1017/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
1018///
1019/// # See Also
1020/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
1021/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
1022/// * [`ArrowReaderBuilder`] for additional member functions
1023pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026    /// Create a new [`ParquetRecordBatchReaderBuilder`]
1027    ///
1028    /// ```
1029    /// # use std::sync::Arc;
1030    /// # use bytes::Bytes;
1031    /// # use arrow_array::{Int32Array, RecordBatch};
1032    /// # use arrow_schema::{DataType, Field, Schema};
1033    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1034    /// # use parquet::arrow::ArrowWriter;
1035    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1036    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1037    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1038    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1039    /// # writer.write(&batch).unwrap();
1040    /// # writer.close().unwrap();
1041    /// # let file = Bytes::from(file);
1042    /// // Build the reader from anything that implements `ChunkReader`
1043    /// // such as a `File`, or `Bytes`
1044    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045    /// // The builder has access to ParquetMetaData such
1046    /// // as the number and layout of row groups
1047    /// assert_eq!(builder.metadata().num_row_groups(), 1);
1048    /// // Call build to create the reader
1049    /// let mut reader: ParquetRecordBatchReader = builder.build().unwrap();
1050    /// // Read data
1051    /// while let Some(batch) = reader.next().transpose()? {
1052    ///     println!("Read {} rows", batch.num_rows());
1053    /// }
1054    /// # Ok::<(), parquet::errors::ParquetError>(())
1055    /// ```
1056    pub fn try_new(reader: T) -> Result<Self> {
1057        Self::try_new_with_options(reader, Default::default())
1058    }
1059
1060    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
1061    ///
1062    /// Use this method if you want to control the options for reading the
1063    /// [`ParquetMetaData`]
1064    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065        let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066        Ok(Self::new_with_metadata(reader, metadata))
1067    }
1068
1069    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
1070    ///
1071    /// Use this method if you already have [`ParquetMetaData`] for a file.
1072    /// This interface allows:
1073    ///
1074    /// 1. Loading metadata once and using it to create multiple builders with
1075    ///    potentially different settings or run on different threads
1076    ///
1077    /// 2. Using a cached copy of the metadata rather than re-reading it from the
1078    ///    file each time a reader is constructed.
1079    ///
1080    /// See the docs on [`ArrowReaderMetadata`] for more details
1081    ///
1082    /// # Example
1083    /// ```
1084    /// # use std::fs::metadata;
1085    /// # use std::sync::Arc;
1086    /// # use bytes::Bytes;
1087    /// # use arrow_array::{Int32Array, RecordBatch};
1088    /// # use arrow_schema::{DataType, Field, Schema};
1089    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1090    /// # use parquet::arrow::ArrowWriter;
1091    /// #
1092    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
1093    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
1094    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1095    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
1096    /// # writer.write(&batch).unwrap();
1097    /// # writer.close().unwrap();
1098    /// # let file = Bytes::from(file);
1099    /// #
1100    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
1101    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
1102    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
1103    ///
1104    /// // Should be able to read from both in parallel
1105    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
1106    /// ```
1107    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108        Self::new_builder(SyncReader(input), metadata)
1109    }
1110
1111    /// Read bloom filter for a column in a row group
1112    ///
1113    /// Returns `None` if the column does not have a bloom filter
1114    ///
1115    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
1116    pub fn get_row_group_column_bloom_filter(
1117        &self,
1118        row_group_idx: usize,
1119        column_idx: usize,
1120    ) -> Result<Option<Sbbf>> {
1121        let metadata = self.metadata.row_group(row_group_idx);
1122        let column_metadata = metadata.column(column_idx);
1123
1124        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125            offset
1126                .try_into()
1127                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128        } else {
1129            return Ok(None);
1130        };
1131
1132        let buffer = match column_metadata.bloom_filter_length() {
1133            Some(length) => self.input.0.get_bytes(offset, length as usize),
1134            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135        }?;
1136
1137        let (header, bitset_offset) =
1138            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140        match header.algorithm {
1141            BloomFilterAlgorithm::BLOCK => {
1142                // this match exists to future proof the singleton algorithm enum
1143            }
1144        }
1145        match header.compression {
1146            BloomFilterCompression::UNCOMPRESSED => {
1147                // this match exists to future proof the singleton compression enum
1148            }
1149        }
1150        match header.hash {
1151            BloomFilterHash::XXHASH => {
1152                // this match exists to future proof the singleton hash enum
1153            }
1154        }
1155
1156        let bitset = match column_metadata.bloom_filter_length() {
1157            Some(_) => buffer.slice(
1158                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159                    - TryInto::<usize>::try_into(offset).unwrap())..,
1160            ),
1161            None => {
1162                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163                    ParquetError::General("Bloom filter length is invalid".to_string())
1164                })?;
1165                self.input.0.get_bytes(bitset_offset, bitset_length)?
1166            }
1167        };
1168        Ok(Some(Sbbf::new(&bitset)))
1169    }
1170
1171    /// Build a [`ParquetRecordBatchReader`]
1172    ///
1173    /// Note: this will eagerly evaluate any `RowFilter` before returning
1174    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175        let Self {
1176            input,
1177            metadata,
1178            schema: _,
1179            fields,
1180            batch_size,
1181            row_groups,
1182            projection,
1183            mut filter,
1184            selection,
1185            row_selection_policy,
1186            limit,
1187            offset,
1188            metrics,
1189            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1190            max_predicate_cache_size: _,
1191        } = self;
1192
1193        // Try to avoid allocate large buffer
1194        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198        let reader = ReaderRowGroups {
1199            reader: Arc::new(input.0),
1200            metadata,
1201            row_groups,
1202        };
1203
1204        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205            .with_selection(selection)
1206            .with_row_selection_policy(row_selection_policy);
1207
1208        // Update selection based on any filters
1209        if let Some(filter) = filter.as_mut() {
1210            for predicate in filter.predicates.iter_mut() {
1211                // break early if we have ruled out all rows
1212                if !plan_builder.selects_any() {
1213                    break;
1214                }
1215
1216                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217                    .with_parquet_metadata(&reader.metadata)
1218                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221            }
1222        }
1223
1224        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225            .with_parquet_metadata(&reader.metadata)
1226            .build_array_reader(fields.as_deref(), &projection)?;
1227
1228        let read_plan = plan_builder
1229            .limited(reader.num_rows())
1230            .with_offset(offset)
1231            .with_limit(limit)
1232            .build_limited()
1233            .build();
1234
1235        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236    }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240    reader: Arc<T>,
1241
1242    metadata: Arc<ParquetMetaData>,
1243    /// Optional list of row group indices to scan
1244    row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248    fn num_rows(&self) -> usize {
1249        let meta = self.metadata.row_groups();
1250        self.row_groups
1251            .iter()
1252            .map(|x| meta[*x].num_rows() as usize)
1253            .sum()
1254    }
1255
1256    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257        Ok(Box::new(ReaderPageIterator {
1258            column_idx: i,
1259            reader: self.reader.clone(),
1260            metadata: self.metadata.clone(),
1261            row_groups: self.row_groups.clone().into_iter(),
1262        }))
1263    }
1264
1265    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266        Box::new(
1267            self.row_groups
1268                .iter()
1269                .map(move |i| self.metadata.row_group(*i)),
1270        )
1271    }
1272
1273    fn metadata(&self) -> &ParquetMetaData {
1274        self.metadata.as_ref()
1275    }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279    reader: Arc<T>,
1280    column_idx: usize,
1281    row_groups: std::vec::IntoIter<usize>,
1282    metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286    /// Return the next SerializedPageReader
1287    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288        let rg = self.metadata.row_group(rg_idx);
1289        let column_chunk_metadata = rg.column(self.column_idx);
1290        let offset_index = self.metadata.offset_index();
1291        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1292        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1293        let page_locations = offset_index
1294            .filter(|i| !i[rg_idx].is_empty())
1295            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296        let total_rows = rg.num_rows() as usize;
1297        let reader = self.reader.clone();
1298
1299        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300            .add_crypto_context(
1301                rg_idx,
1302                self.column_idx,
1303                self.metadata.as_ref(),
1304                column_chunk_metadata,
1305            )
1306    }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310    type Item = Result<Box<dyn PageReader>>;
1311
1312    fn next(&mut self) -> Option<Self::Item> {
1313        let rg_idx = self.row_groups.next()?;
1314        let page_reader = self
1315            .next_page_reader(rg_idx)
1316            .map(|page_reader| Box::new(page_reader) as _);
1317        Some(page_reader)
1318    }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323/// Reads Parquet data as Arrow [`RecordBatch`]es
1324///
1325/// This struct implements the [`RecordBatchReader`] trait and is an
1326/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1327///
1328/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1329///
1330/// Created by [`ParquetRecordBatchReaderBuilder`]
1331///
1332/// [`Bytes`]: bytes::Bytes
1333pub struct ParquetRecordBatchReader {
1334    array_reader: Box<dyn ArrayReader>,
1335    schema: SchemaRef,
1336    read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341        f.debug_struct("ParquetRecordBatchReader")
1342            .field("array_reader", &"...")
1343            .field("schema", &self.schema)
1344            .field("read_plan", &self.read_plan)
1345            .finish()
1346    }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350    type Item = Result<RecordBatch, ArrowError>;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        self.next_inner()
1354            .map_err(|arrow_err| arrow_err.into())
1355            .transpose()
1356    }
1357}
1358
1359impl ParquetRecordBatchReader {
1360    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1361    /// has reached the end of the file.
1362    ///
1363    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1364    /// simplify error handling with `?`
1365    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366        let mut read_records = 0;
1367        let batch_size = self.batch_size();
1368        if batch_size == 0 {
1369            return Ok(None);
1370        }
1371        match self.read_plan.row_selection_cursor_mut() {
1372            RowSelectionCursor::Mask(mask_cursor) => {
1373                // Stream the record batch reader using contiguous segments of the selection
1374                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1375                while !mask_cursor.is_empty() {
1376                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377                        return Ok(None);
1378                    };
1379
1380                    if mask_chunk.initial_skip > 0 {
1381                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382                        if skipped != mask_chunk.initial_skip {
1383                            return Err(general_err!(
1384                                "failed to skip rows, expected {}, got {}",
1385                                mask_chunk.initial_skip,
1386                                skipped
1387                            ));
1388                        }
1389                    }
1390
1391                    if mask_chunk.chunk_rows == 0 {
1392                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393                            return Ok(None);
1394                        }
1395                        continue;
1396                    }
1397
1398                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401                    if read == 0 {
1402                        return Err(general_err!(
1403                            "reached end of column while expecting {} rows",
1404                            mask_chunk.chunk_rows
1405                        ));
1406                    }
1407                    if read != mask_chunk.chunk_rows {
1408                        return Err(general_err!(
1409                            "insufficient rows read from array reader - expected {}, got {}",
1410                            mask_chunk.chunk_rows,
1411                            read
1412                        ));
1413                    }
1414
1415                    let array = self.array_reader.consume_batch()?;
1416                    // The column reader exposes the projection as a struct array; convert this
1417                    // into a record batch before applying the boolean filter mask.
1418                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1419                        ArrowError::ParquetError(
1420                            "Struct array reader should return struct array".to_string(),
1421                        )
1422                    })?;
1423
1424                    let filtered_batch =
1425                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428                        return Err(general_err!(
1429                            "filtered rows mismatch selection - expected {}, got {}",
1430                            mask_chunk.selected_rows,
1431                            filtered_batch.num_rows()
1432                        ));
1433                    }
1434
1435                    if filtered_batch.num_rows() == 0 {
1436                        continue;
1437                    }
1438
1439                    return Ok(Some(filtered_batch));
1440                }
1441            }
1442            RowSelectionCursor::Selectors(selectors_cursor) => {
1443                while read_records < batch_size && !selectors_cursor.is_empty() {
1444                    let front = selectors_cursor.next_selector();
1445                    if front.skip {
1446                        let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448                        if skipped != front.row_count {
1449                            return Err(general_err!(
1450                                "failed to skip rows, expected {}, got {}",
1451                                front.row_count,
1452                                skipped
1453                            ));
1454                        }
1455                        continue;
1456                    }
1457
1458                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1459                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1460                    if front.row_count == 0 {
1461                        continue;
1462                    }
1463
1464                    // try to read record
1465                    let need_read = batch_size - read_records;
1466                    let to_read = match front.row_count.checked_sub(need_read) {
1467                        Some(remaining) if remaining != 0 => {
1468                            // if page row count less than batch_size we must set batch size to page row count.
1469                            // add check avoid dead loop
1470                            selectors_cursor.return_selector(RowSelector::select(remaining));
1471                            need_read
1472                        }
1473                        _ => front.row_count,
1474                    };
1475                    match self.array_reader.read_records(to_read)? {
1476                        0 => break,
1477                        rec => read_records += rec,
1478                    };
1479                }
1480            }
1481            RowSelectionCursor::All => {
1482                self.array_reader.read_records(batch_size)?;
1483            }
1484        };
1485
1486        let array = self.array_reader.consume_batch()?;
1487        let struct_array = array.as_struct_opt().ok_or_else(|| {
1488            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489        })?;
1490
1491        Ok(if struct_array.len() > 0 {
1492            Some(RecordBatch::from(struct_array))
1493        } else {
1494            None
1495        })
1496    }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1501    ///
1502    /// Note that the schema metadata will be stripped here. See
1503    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1504    fn schema(&self) -> SchemaRef {
1505        self.schema.clone()
1506    }
1507}
1508
1509impl ParquetRecordBatchReader {
1510    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1511    ///
1512    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1513    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514        ParquetRecordBatchReaderBuilder::try_new(reader)?
1515            .with_batch_size(batch_size)
1516            .build()
1517    }
1518
1519    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1520    ///
1521    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1522    /// higher-level interface for reading parquet data from a file
1523    pub fn try_new_with_row_groups(
1524        levels: &FieldLevels,
1525        row_groups: &dyn RowGroups,
1526        batch_size: usize,
1527        selection: Option<RowSelection>,
1528    ) -> Result<Self> {
1529        // note metrics are not supported in this API
1530        let metrics = ArrowReaderMetrics::disabled();
1531        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532            .with_parquet_metadata(row_groups.metadata())
1533            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535        let read_plan = ReadPlanBuilder::new(batch_size)
1536            .with_selection(selection)
1537            .build();
1538
1539        Ok(Self {
1540            array_reader,
1541            schema: Arc::new(Schema::new(levels.fields.clone())),
1542            read_plan,
1543        })
1544    }
1545
1546    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1547    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1548    /// all rows will be returned
1549    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550        let schema = match array_reader.get_data_type() {
1551            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552            _ => unreachable!("Struct array reader's data type is not struct!"),
1553        };
1554
1555        Self {
1556            array_reader,
1557            schema: Arc::new(schema),
1558            read_plan,
1559        }
1560    }
1561
1562    #[inline(always)]
1563    pub(crate) fn batch_size(&self) -> usize {
1564        self.read_plan.batch_size()
1565    }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570    use std::cmp::min;
1571    use std::collections::{HashMap, VecDeque};
1572    use std::fmt::Formatter;
1573    use std::fs::File;
1574    use std::io::Seek;
1575    use std::path::PathBuf;
1576    use std::sync::Arc;
1577
1578    use rand::rngs::StdRng;
1579    use rand::{Rng, RngCore, SeedableRng, random, rng};
1580    use tempfile::tempfile;
1581
1582    use crate::arrow::arrow_reader::{
1583        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1584        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1585        RowSelectionPolicy, RowSelector,
1586    };
1587    use crate::arrow::schema::{
1588        add_encoded_arrow_schema_to_metadata,
1589        virtual_type::{RowGroupIndex, RowNumber},
1590    };
1591    use crate::arrow::{ArrowWriter, ProjectionMask};
1592    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1593    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1594    use crate::data_type::{
1595        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1596        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1597    };
1598    use crate::errors::Result;
1599    use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1600    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1601    use crate::file::writer::SerializedFileWriter;
1602    use crate::schema::parser::parse_message_type;
1603    use crate::schema::types::{Type, TypePtr};
1604    use crate::util::test_common::rand_gen::RandGen;
1605    use arrow::compute::kernels::cmp::eq;
1606    use arrow::compute::or;
1607    use arrow_array::builder::*;
1608    use arrow_array::cast::AsArray;
1609    use arrow_array::types::{
1610        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1611        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1612        Time64MicrosecondType,
1613    };
1614    use arrow_array::*;
1615    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1616    use arrow_data::{ArrayData, ArrayDataBuilder};
1617    use arrow_schema::{
1618        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1619    };
1620    use arrow_select::concat::concat_batches;
1621    use bytes::Bytes;
1622    use half::f16;
1623    use num_traits::PrimInt;
1624
1625    #[test]
1626    fn test_arrow_reader_all_columns() {
1627        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1628
1629        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1630        let original_schema = Arc::clone(builder.schema());
1631        let reader = builder.build().unwrap();
1632
1633        // Verify that the schema was correctly parsed
1634        assert_eq!(original_schema.fields(), reader.schema().fields());
1635    }
1636
1637    #[test]
1638    fn test_reuse_schema() {
1639        let file = get_test_file("parquet/alltypes-java.parquet");
1640
1641        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1642        let expected = builder.metadata;
1643        let schema = expected.file_metadata().schema_descr_ptr();
1644
1645        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1646        let builder =
1647            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1648
1649        // Verify that the metadata matches
1650        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1651    }
1652
1653    #[test]
1654    fn test_page_encoding_stats_mask() {
1655        let testdata = arrow::util::test_util::parquet_test_data();
1656        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1657        let file = File::open(path).unwrap();
1658
1659        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1660        let builder =
1661            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1662
1663        let row_group_metadata = builder.metadata.row_group(0);
1664
1665        // test page encoding stats
1666        let page_encoding_stats = row_group_metadata
1667            .column(0)
1668            .page_encoding_stats_mask()
1669            .unwrap();
1670        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1671        let page_encoding_stats = row_group_metadata
1672            .column(2)
1673            .page_encoding_stats_mask()
1674            .unwrap();
1675        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1676    }
1677
1678    #[test]
1679    fn test_stats_stats_skipped() {
1680        let testdata = arrow::util::test_util::parquet_test_data();
1681        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1682        let file = File::open(path).unwrap();
1683
1684        // test skipping all
1685        let arrow_options = ArrowReaderOptions::new()
1686            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1687            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1688        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1689            file.try_clone().unwrap(),
1690            arrow_options,
1691        )
1692        .unwrap();
1693
1694        let row_group_metadata = builder.metadata.row_group(0);
1695        for column in row_group_metadata.columns() {
1696            assert!(column.page_encoding_stats().is_none());
1697            assert!(column.page_encoding_stats_mask().is_none());
1698            assert!(column.statistics().is_none());
1699        }
1700
1701        // test skipping all but one column and converting to mask
1702        let arrow_options = ArrowReaderOptions::new()
1703            .with_encoding_stats_as_mask(true)
1704            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1705            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1706        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1707            file.try_clone().unwrap(),
1708            arrow_options,
1709        )
1710        .unwrap();
1711
1712        let row_group_metadata = builder.metadata.row_group(0);
1713        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1714            assert!(column.page_encoding_stats().is_none());
1715            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1716            assert_eq!(column.statistics().is_some(), idx == 0);
1717        }
1718    }
1719
1720    #[test]
1721    fn test_size_stats_stats_skipped() {
1722        let testdata = arrow::util::test_util::parquet_test_data();
1723        let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1724        let file = File::open(path).unwrap();
1725
1726        // test skipping all
1727        let arrow_options =
1728            ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1729        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1730            file.try_clone().unwrap(),
1731            arrow_options,
1732        )
1733        .unwrap();
1734
1735        let row_group_metadata = builder.metadata.row_group(0);
1736        for column in row_group_metadata.columns() {
1737            assert!(column.repetition_level_histogram().is_none());
1738            assert!(column.definition_level_histogram().is_none());
1739            assert!(column.unencoded_byte_array_data_bytes().is_none());
1740        }
1741
1742        // test skipping all but one column and converting to mask
1743        let arrow_options = ArrowReaderOptions::new()
1744            .with_encoding_stats_as_mask(true)
1745            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1746        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1747            file.try_clone().unwrap(),
1748            arrow_options,
1749        )
1750        .unwrap();
1751
1752        let row_group_metadata = builder.metadata.row_group(0);
1753        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1754            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1755            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1756            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1757        }
1758    }
1759
1760    #[test]
1761    fn test_arrow_reader_single_column() {
1762        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1763
1764        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1765        let original_schema = Arc::clone(builder.schema());
1766
1767        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1768        let reader = builder.with_projection(mask).build().unwrap();
1769
1770        // Verify that the schema was correctly parsed
1771        assert_eq!(1, reader.schema().fields().len());
1772        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1773    }
1774
1775    #[test]
1776    fn test_arrow_reader_single_column_by_name() {
1777        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1778
1779        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1780        let original_schema = Arc::clone(builder.schema());
1781
1782        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1783        let reader = builder.with_projection(mask).build().unwrap();
1784
1785        // Verify that the schema was correctly parsed
1786        assert_eq!(1, reader.schema().fields().len());
1787        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1788    }
1789
1790    #[test]
1791    fn test_null_column_reader_test() {
1792        let mut file = tempfile::tempfile().unwrap();
1793
1794        let schema = "
1795            message message {
1796                OPTIONAL INT32 int32;
1797            }
1798        ";
1799        let schema = Arc::new(parse_message_type(schema).unwrap());
1800
1801        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1802        generate_single_column_file_with_data::<Int32Type>(
1803            &[vec![], vec![]],
1804            Some(&def_levels),
1805            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1806            schema,
1807            Some(Field::new("int32", ArrowDataType::Null, true)),
1808            &Default::default(),
1809        )
1810        .unwrap();
1811
1812        file.rewind().unwrap();
1813
1814        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1815        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1816
1817        assert_eq!(batches.len(), 4);
1818        for batch in &batches[0..3] {
1819            assert_eq!(batch.num_rows(), 2);
1820            assert_eq!(batch.num_columns(), 1);
1821            assert_eq!(batch.column(0).null_count(), 2);
1822        }
1823
1824        assert_eq!(batches[3].num_rows(), 1);
1825        assert_eq!(batches[3].num_columns(), 1);
1826        assert_eq!(batches[3].column(0).null_count(), 1);
1827    }
1828
1829    #[test]
1830    fn test_primitive_single_column_reader_test() {
1831        run_single_column_reader_tests::<BoolType, _, BoolType>(
1832            2,
1833            ConvertedType::NONE,
1834            None,
1835            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1836            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1837        );
1838        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1839            2,
1840            ConvertedType::NONE,
1841            None,
1842            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1843            &[
1844                Encoding::PLAIN,
1845                Encoding::RLE_DICTIONARY,
1846                Encoding::DELTA_BINARY_PACKED,
1847                Encoding::BYTE_STREAM_SPLIT,
1848            ],
1849        );
1850        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1851            2,
1852            ConvertedType::NONE,
1853            None,
1854            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1855            &[
1856                Encoding::PLAIN,
1857                Encoding::RLE_DICTIONARY,
1858                Encoding::DELTA_BINARY_PACKED,
1859                Encoding::BYTE_STREAM_SPLIT,
1860            ],
1861        );
1862        run_single_column_reader_tests::<FloatType, _, FloatType>(
1863            2,
1864            ConvertedType::NONE,
1865            None,
1866            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1867            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1868        );
1869    }
1870
1871    #[test]
1872    fn test_unsigned_primitive_single_column_reader_test() {
1873        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1874            2,
1875            ConvertedType::UINT_32,
1876            Some(ArrowDataType::UInt32),
1877            |vals| {
1878                Arc::new(UInt32Array::from_iter(
1879                    vals.iter().map(|x| x.map(|x| x as u32)),
1880                ))
1881            },
1882            &[
1883                Encoding::PLAIN,
1884                Encoding::RLE_DICTIONARY,
1885                Encoding::DELTA_BINARY_PACKED,
1886            ],
1887        );
1888        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1889            2,
1890            ConvertedType::UINT_64,
1891            Some(ArrowDataType::UInt64),
1892            |vals| {
1893                Arc::new(UInt64Array::from_iter(
1894                    vals.iter().map(|x| x.map(|x| x as u64)),
1895                ))
1896            },
1897            &[
1898                Encoding::PLAIN,
1899                Encoding::RLE_DICTIONARY,
1900                Encoding::DELTA_BINARY_PACKED,
1901            ],
1902        );
1903    }
1904
1905    #[test]
1906    fn test_unsigned_roundtrip() {
1907        let schema = Arc::new(Schema::new(vec![
1908            Field::new("uint32", ArrowDataType::UInt32, true),
1909            Field::new("uint64", ArrowDataType::UInt64, true),
1910        ]));
1911
1912        let mut buf = Vec::with_capacity(1024);
1913        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1914
1915        let original = RecordBatch::try_new(
1916            schema,
1917            vec![
1918                Arc::new(UInt32Array::from_iter_values([
1919                    0,
1920                    i32::MAX as u32,
1921                    u32::MAX,
1922                ])),
1923                Arc::new(UInt64Array::from_iter_values([
1924                    0,
1925                    i64::MAX as u64,
1926                    u64::MAX,
1927                ])),
1928            ],
1929        )
1930        .unwrap();
1931
1932        writer.write(&original).unwrap();
1933        writer.close().unwrap();
1934
1935        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1936        let ret = reader.next().unwrap().unwrap();
1937        assert_eq!(ret, original);
1938
1939        // Check they can be downcast to the correct type
1940        ret.column(0)
1941            .as_any()
1942            .downcast_ref::<UInt32Array>()
1943            .unwrap();
1944
1945        ret.column(1)
1946            .as_any()
1947            .downcast_ref::<UInt64Array>()
1948            .unwrap();
1949    }
1950
1951    #[test]
1952    fn test_float16_roundtrip() -> Result<()> {
1953        let schema = Arc::new(Schema::new(vec![
1954            Field::new("float16", ArrowDataType::Float16, false),
1955            Field::new("float16-nullable", ArrowDataType::Float16, true),
1956        ]));
1957
1958        let mut buf = Vec::with_capacity(1024);
1959        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1960
1961        let original = RecordBatch::try_new(
1962            schema,
1963            vec![
1964                Arc::new(Float16Array::from_iter_values([
1965                    f16::EPSILON,
1966                    f16::MIN,
1967                    f16::MAX,
1968                    f16::NAN,
1969                    f16::INFINITY,
1970                    f16::NEG_INFINITY,
1971                    f16::ONE,
1972                    f16::NEG_ONE,
1973                    f16::ZERO,
1974                    f16::NEG_ZERO,
1975                    f16::E,
1976                    f16::PI,
1977                    f16::FRAC_1_PI,
1978                ])),
1979                Arc::new(Float16Array::from(vec![
1980                    None,
1981                    None,
1982                    None,
1983                    Some(f16::NAN),
1984                    Some(f16::INFINITY),
1985                    Some(f16::NEG_INFINITY),
1986                    None,
1987                    None,
1988                    None,
1989                    None,
1990                    None,
1991                    None,
1992                    Some(f16::FRAC_1_PI),
1993                ])),
1994            ],
1995        )?;
1996
1997        writer.write(&original)?;
1998        writer.close()?;
1999
2000        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2001        let ret = reader.next().unwrap()?;
2002        assert_eq!(ret, original);
2003
2004        // Ensure can be downcast to the correct type
2005        ret.column(0).as_primitive::<Float16Type>();
2006        ret.column(1).as_primitive::<Float16Type>();
2007
2008        Ok(())
2009    }
2010
2011    #[test]
2012    fn test_time_utc_roundtrip() -> Result<()> {
2013        let schema = Arc::new(Schema::new(vec![
2014            Field::new(
2015                "time_millis",
2016                ArrowDataType::Time32(TimeUnit::Millisecond),
2017                true,
2018            )
2019            .with_metadata(HashMap::from_iter(vec![(
2020                "adjusted_to_utc".to_string(),
2021                "".to_string(),
2022            )])),
2023            Field::new(
2024                "time_micros",
2025                ArrowDataType::Time64(TimeUnit::Microsecond),
2026                true,
2027            )
2028            .with_metadata(HashMap::from_iter(vec![(
2029                "adjusted_to_utc".to_string(),
2030                "".to_string(),
2031            )])),
2032        ]));
2033
2034        let mut buf = Vec::with_capacity(1024);
2035        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2036
2037        let original = RecordBatch::try_new(
2038            schema,
2039            vec![
2040                Arc::new(Time32MillisecondArray::from(vec![
2041                    Some(-1),
2042                    Some(0),
2043                    Some(86_399_000),
2044                    Some(86_400_000),
2045                    Some(86_401_000),
2046                    None,
2047                ])),
2048                Arc::new(Time64MicrosecondArray::from(vec![
2049                    Some(-1),
2050                    Some(0),
2051                    Some(86_399 * 1_000_000),
2052                    Some(86_400 * 1_000_000),
2053                    Some(86_401 * 1_000_000),
2054                    None,
2055                ])),
2056            ],
2057        )?;
2058
2059        writer.write(&original)?;
2060        writer.close()?;
2061
2062        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2063        let ret = reader.next().unwrap()?;
2064        assert_eq!(ret, original);
2065
2066        // Ensure can be downcast to the correct type
2067        ret.column(0).as_primitive::<Time32MillisecondType>();
2068        ret.column(1).as_primitive::<Time64MicrosecondType>();
2069
2070        Ok(())
2071    }
2072
2073    #[test]
2074    fn test_date32_roundtrip() -> Result<()> {
2075        use arrow_array::Date32Array;
2076
2077        let schema = Arc::new(Schema::new(vec![Field::new(
2078            "date32",
2079            ArrowDataType::Date32,
2080            false,
2081        )]));
2082
2083        let mut buf = Vec::with_capacity(1024);
2084
2085        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2086
2087        let original = RecordBatch::try_new(
2088            schema,
2089            vec![Arc::new(Date32Array::from(vec![
2090                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2091            ]))],
2092        )?;
2093
2094        writer.write(&original)?;
2095        writer.close()?;
2096
2097        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2098        let ret = reader.next().unwrap()?;
2099        assert_eq!(ret, original);
2100
2101        // Ensure can be downcast to the correct type
2102        ret.column(0).as_primitive::<Date32Type>();
2103
2104        Ok(())
2105    }
2106
2107    #[test]
2108    fn test_date64_roundtrip() -> Result<()> {
2109        use arrow_array::Date64Array;
2110
2111        let schema = Arc::new(Schema::new(vec![
2112            Field::new("small-date64", ArrowDataType::Date64, false),
2113            Field::new("big-date64", ArrowDataType::Date64, false),
2114            Field::new("invalid-date64", ArrowDataType::Date64, false),
2115        ]));
2116
2117        let mut default_buf = Vec::with_capacity(1024);
2118        let mut coerce_buf = Vec::with_capacity(1024);
2119
2120        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2121
2122        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2123        let mut coerce_writer =
2124            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2125
2126        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2127
2128        let original = RecordBatch::try_new(
2129            schema,
2130            vec![
2131                // small-date64
2132                Arc::new(Date64Array::from(vec![
2133                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2134                    -1_000 * NUM_MILLISECONDS_IN_DAY,
2135                    0,
2136                    1_000 * NUM_MILLISECONDS_IN_DAY,
2137                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
2138                ])),
2139                // big-date64
2140                Arc::new(Date64Array::from(vec![
2141                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2142                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2143                    0,
2144                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2145                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146                ])),
2147                // invalid-date64
2148                Arc::new(Date64Array::from(vec![
2149                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2150                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2151                    1,
2152                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2153                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154                ])),
2155            ],
2156        )?;
2157
2158        default_writer.write(&original)?;
2159        coerce_writer.write(&original)?;
2160
2161        default_writer.close()?;
2162        coerce_writer.close()?;
2163
2164        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2165        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2166
2167        let default_ret = default_reader.next().unwrap()?;
2168        let coerce_ret = coerce_reader.next().unwrap()?;
2169
2170        // Roundtrip should be successful when default writer used
2171        assert_eq!(default_ret, original);
2172
2173        // Only small-date64 should roundtrip successfully when coerce_types writer is used
2174        assert_eq!(coerce_ret.column(0), original.column(0));
2175        assert_ne!(coerce_ret.column(1), original.column(1));
2176        assert_ne!(coerce_ret.column(2), original.column(2));
2177
2178        // Ensure both can be downcast to the correct type
2179        default_ret.column(0).as_primitive::<Date64Type>();
2180        coerce_ret.column(0).as_primitive::<Date64Type>();
2181
2182        Ok(())
2183    }
2184    struct RandFixedLenGen {}
2185
2186    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2187        fn r#gen(len: i32) -> FixedLenByteArray {
2188            let mut v = vec![0u8; len as usize];
2189            rng().fill_bytes(&mut v);
2190            ByteArray::from(v).into()
2191        }
2192    }
2193
2194    #[test]
2195    fn test_fixed_length_binary_column_reader() {
2196        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2197            20,
2198            ConvertedType::NONE,
2199            None,
2200            |vals| {
2201                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2202                for val in vals {
2203                    match val {
2204                        Some(b) => builder.append_value(b).unwrap(),
2205                        None => builder.append_null(),
2206                    }
2207                }
2208                Arc::new(builder.finish())
2209            },
2210            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2211        );
2212    }
2213
2214    #[test]
2215    fn test_interval_day_time_column_reader() {
2216        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2217            12,
2218            ConvertedType::INTERVAL,
2219            None,
2220            |vals| {
2221                Arc::new(
2222                    vals.iter()
2223                        .map(|x| {
2224                            x.as_ref().map(|b| IntervalDayTime {
2225                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2226                                milliseconds: i32::from_le_bytes(
2227                                    b.as_ref()[8..12].try_into().unwrap(),
2228                                ),
2229                            })
2230                        })
2231                        .collect::<IntervalDayTimeArray>(),
2232                )
2233            },
2234            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2235        );
2236    }
2237
2238    #[test]
2239    fn test_int96_single_column_reader_test() {
2240        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2241
2242        type TypeHintAndConversionFunction =
2243            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2244
2245        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2246            // Test without a specified ArrowType hint.
2247            (None, |vals: &[Option<Int96>]| {
2248                Arc::new(TimestampNanosecondArray::from_iter(
2249                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2250                )) as ArrayRef
2251            }),
2252            // Test other TimeUnits as ArrowType hints.
2253            (
2254                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2255                |vals: &[Option<Int96>]| {
2256                    Arc::new(TimestampSecondArray::from_iter(
2257                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2258                    )) as ArrayRef
2259                },
2260            ),
2261            (
2262                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2263                |vals: &[Option<Int96>]| {
2264                    Arc::new(TimestampMillisecondArray::from_iter(
2265                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2266                    )) as ArrayRef
2267                },
2268            ),
2269            (
2270                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2271                |vals: &[Option<Int96>]| {
2272                    Arc::new(TimestampMicrosecondArray::from_iter(
2273                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2274                    )) as ArrayRef
2275                },
2276            ),
2277            (
2278                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2279                |vals: &[Option<Int96>]| {
2280                    Arc::new(TimestampNanosecondArray::from_iter(
2281                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2282                    )) as ArrayRef
2283                },
2284            ),
2285            // Test another timezone with TimeUnit as ArrowType hints.
2286            (
2287                Some(ArrowDataType::Timestamp(
2288                    TimeUnit::Second,
2289                    Some(Arc::from("-05:00")),
2290                )),
2291                |vals: &[Option<Int96>]| {
2292                    Arc::new(
2293                        TimestampSecondArray::from_iter(
2294                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2295                        )
2296                        .with_timezone("-05:00"),
2297                    ) as ArrayRef
2298                },
2299            ),
2300        ];
2301
2302        resolutions.iter().for_each(|(arrow_type, converter)| {
2303            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2304                2,
2305                ConvertedType::NONE,
2306                arrow_type.clone(),
2307                converter,
2308                encodings,
2309            );
2310        })
2311    }
2312
2313    #[test]
2314    fn test_int96_from_spark_file_with_provided_schema() {
2315        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2316        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2317        // microsecond resolution for the Parquet reader to interpret these values correctly.
2318        use arrow_schema::DataType::Timestamp;
2319        let test_data = arrow::util::test_util::parquet_test_data();
2320        let path = format!("{test_data}/int96_from_spark.parquet");
2321        let file = File::open(path).unwrap();
2322
2323        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2324            "a",
2325            Timestamp(TimeUnit::Microsecond, None),
2326            true,
2327        )]));
2328        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2329
2330        let mut record_reader =
2331            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2332                .unwrap()
2333                .build()
2334                .unwrap();
2335
2336        let batch = record_reader.next().unwrap().unwrap();
2337        assert_eq!(batch.num_columns(), 1);
2338        let column = batch.column(0);
2339        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2340
2341        let expected = Arc::new(Int64Array::from(vec![
2342            Some(1704141296123456),
2343            Some(1704070800000000),
2344            Some(253402225200000000),
2345            Some(1735599600000000),
2346            None,
2347            Some(9089380393200000000),
2348        ]));
2349
2350        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2351        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2352        // anyway, so this should be a zero-copy non-modifying cast.
2353
2354        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2355        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2356
2357        assert_eq!(casted_timestamps.len(), expected.len());
2358
2359        casted_timestamps
2360            .iter()
2361            .zip(expected.iter())
2362            .for_each(|(lhs, rhs)| {
2363                assert_eq!(lhs, rhs);
2364            });
2365    }
2366
2367    #[test]
2368    fn test_int96_from_spark_file_without_provided_schema() {
2369        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2370        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2371        // values when read as nanosecond resolution overflow and result in garbage values.
2372        use arrow_schema::DataType::Timestamp;
2373        let test_data = arrow::util::test_util::parquet_test_data();
2374        let path = format!("{test_data}/int96_from_spark.parquet");
2375        let file = File::open(path).unwrap();
2376
2377        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2378            .unwrap()
2379            .build()
2380            .unwrap();
2381
2382        let batch = record_reader.next().unwrap().unwrap();
2383        assert_eq!(batch.num_columns(), 1);
2384        let column = batch.column(0);
2385        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2386
2387        let expected = Arc::new(Int64Array::from(vec![
2388            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2389            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2390            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2391            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2392            None,
2393            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2394        ]));
2395
2396        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2397        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2398        // anyway, so this should be a zero-copy non-modifying cast.
2399
2400        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2401        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2402
2403        assert_eq!(casted_timestamps.len(), expected.len());
2404
2405        casted_timestamps
2406            .iter()
2407            .zip(expected.iter())
2408            .for_each(|(lhs, rhs)| {
2409                assert_eq!(lhs, rhs);
2410            });
2411    }
2412
2413    struct RandUtf8Gen {}
2414
2415    impl RandGen<ByteArrayType> for RandUtf8Gen {
2416        fn r#gen(len: i32) -> ByteArray {
2417            Int32Type::r#gen(len).to_string().as_str().into()
2418        }
2419    }
2420
2421    #[test]
2422    fn test_utf8_single_column_reader_test() {
2423        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2424            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2425                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2426            })))
2427        }
2428
2429        let encodings = &[
2430            Encoding::PLAIN,
2431            Encoding::RLE_DICTIONARY,
2432            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2433            Encoding::DELTA_BYTE_ARRAY,
2434        ];
2435
2436        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2437            2,
2438            ConvertedType::NONE,
2439            None,
2440            |vals| {
2441                Arc::new(BinaryArray::from_iter(
2442                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2443                ))
2444            },
2445            encodings,
2446        );
2447
2448        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2449            2,
2450            ConvertedType::UTF8,
2451            None,
2452            string_converter::<i32>,
2453            encodings,
2454        );
2455
2456        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2457            2,
2458            ConvertedType::UTF8,
2459            Some(ArrowDataType::Utf8),
2460            string_converter::<i32>,
2461            encodings,
2462        );
2463
2464        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2465            2,
2466            ConvertedType::UTF8,
2467            Some(ArrowDataType::LargeUtf8),
2468            string_converter::<i64>,
2469            encodings,
2470        );
2471
2472        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2473        for key in &small_key_types {
2474            for encoding in encodings {
2475                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2476                opts.encoding = *encoding;
2477
2478                let data_type =
2479                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2480
2481                // Cannot run full test suite as keys overflow, run small test instead
2482                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2483                    opts,
2484                    2,
2485                    ConvertedType::UTF8,
2486                    Some(data_type.clone()),
2487                    move |vals| {
2488                        let vals = string_converter::<i32>(vals);
2489                        arrow::compute::cast(&vals, &data_type).unwrap()
2490                    },
2491                );
2492            }
2493        }
2494
2495        let key_types = [
2496            ArrowDataType::Int16,
2497            ArrowDataType::UInt16,
2498            ArrowDataType::Int32,
2499            ArrowDataType::UInt32,
2500            ArrowDataType::Int64,
2501            ArrowDataType::UInt64,
2502        ];
2503
2504        for key in &key_types {
2505            let data_type =
2506                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2507
2508            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2509                2,
2510                ConvertedType::UTF8,
2511                Some(data_type.clone()),
2512                move |vals| {
2513                    let vals = string_converter::<i32>(vals);
2514                    arrow::compute::cast(&vals, &data_type).unwrap()
2515                },
2516                encodings,
2517            );
2518
2519            let data_type = ArrowDataType::Dictionary(
2520                Box::new(key.clone()),
2521                Box::new(ArrowDataType::LargeUtf8),
2522            );
2523
2524            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2525                2,
2526                ConvertedType::UTF8,
2527                Some(data_type.clone()),
2528                move |vals| {
2529                    let vals = string_converter::<i64>(vals);
2530                    arrow::compute::cast(&vals, &data_type).unwrap()
2531                },
2532                encodings,
2533            );
2534        }
2535    }
2536
2537    #[test]
2538    fn test_decimal_nullable_struct() {
2539        let decimals = Decimal256Array::from_iter_values(
2540            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2541        );
2542
2543        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2544            "decimals",
2545            decimals.data_type().clone(),
2546            false,
2547        )])))
2548        .len(8)
2549        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2550        .child_data(vec![decimals.into_data()])
2551        .build()
2552        .unwrap();
2553
2554        let written =
2555            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2556                .unwrap();
2557
2558        let mut buffer = Vec::with_capacity(1024);
2559        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2560        writer.write(&written).unwrap();
2561        writer.close().unwrap();
2562
2563        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2564            .unwrap()
2565            .collect::<Result<Vec<_>, _>>()
2566            .unwrap();
2567
2568        assert_eq!(&written.slice(0, 3), &read[0]);
2569        assert_eq!(&written.slice(3, 3), &read[1]);
2570        assert_eq!(&written.slice(6, 2), &read[2]);
2571    }
2572
2573    #[test]
2574    fn test_int32_nullable_struct() {
2575        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2576        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2577            "int32",
2578            int32.data_type().clone(),
2579            false,
2580        )])))
2581        .len(8)
2582        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2583        .child_data(vec![int32.into_data()])
2584        .build()
2585        .unwrap();
2586
2587        let written =
2588            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2589                .unwrap();
2590
2591        let mut buffer = Vec::with_capacity(1024);
2592        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2593        writer.write(&written).unwrap();
2594        writer.close().unwrap();
2595
2596        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2597            .unwrap()
2598            .collect::<Result<Vec<_>, _>>()
2599            .unwrap();
2600
2601        assert_eq!(&written.slice(0, 3), &read[0]);
2602        assert_eq!(&written.slice(3, 3), &read[1]);
2603        assert_eq!(&written.slice(6, 2), &read[2]);
2604    }
2605
2606    #[test]
2607    fn test_decimal_list() {
2608        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2609
2610        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2611        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2612            decimals.data_type().clone(),
2613            false,
2614        ))))
2615        .len(7)
2616        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2617        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2618        .child_data(vec![decimals.into_data()])
2619        .build()
2620        .unwrap();
2621
2622        let written =
2623            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2624                .unwrap();
2625
2626        let mut buffer = Vec::with_capacity(1024);
2627        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2628        writer.write(&written).unwrap();
2629        writer.close().unwrap();
2630
2631        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2632            .unwrap()
2633            .collect::<Result<Vec<_>, _>>()
2634            .unwrap();
2635
2636        assert_eq!(&written.slice(0, 3), &read[0]);
2637        assert_eq!(&written.slice(3, 3), &read[1]);
2638        assert_eq!(&written.slice(6, 1), &read[2]);
2639    }
2640
2641    #[test]
2642    fn test_read_decimal_file() {
2643        use arrow_array::Decimal128Array;
2644        let testdata = arrow::util::test_util::parquet_test_data();
2645        let file_variants = vec![
2646            ("byte_array", 4),
2647            ("fixed_length", 25),
2648            ("int32", 4),
2649            ("int64", 10),
2650        ];
2651        for (prefix, target_precision) in file_variants {
2652            let path = format!("{testdata}/{prefix}_decimal.parquet");
2653            let file = File::open(path).unwrap();
2654            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2655
2656            let batch = record_reader.next().unwrap().unwrap();
2657            assert_eq!(batch.num_rows(), 24);
2658            let col = batch
2659                .column(0)
2660                .as_any()
2661                .downcast_ref::<Decimal128Array>()
2662                .unwrap();
2663
2664            let expected = 1..25;
2665
2666            assert_eq!(col.precision(), target_precision);
2667            assert_eq!(col.scale(), 2);
2668
2669            for (i, v) in expected.enumerate() {
2670                assert_eq!(col.value(i), v * 100_i128);
2671            }
2672        }
2673    }
2674
2675    #[test]
2676    fn test_read_float16_nonzeros_file() {
2677        use arrow_array::Float16Array;
2678        let testdata = arrow::util::test_util::parquet_test_data();
2679        // see https://github.com/apache/parquet-testing/pull/40
2680        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2681        let file = File::open(path).unwrap();
2682        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2683
2684        let batch = record_reader.next().unwrap().unwrap();
2685        assert_eq!(batch.num_rows(), 8);
2686        let col = batch
2687            .column(0)
2688            .as_any()
2689            .downcast_ref::<Float16Array>()
2690            .unwrap();
2691
2692        let f16_two = f16::ONE + f16::ONE;
2693
2694        assert_eq!(col.null_count(), 1);
2695        assert!(col.is_null(0));
2696        assert_eq!(col.value(1), f16::ONE);
2697        assert_eq!(col.value(2), -f16_two);
2698        assert!(col.value(3).is_nan());
2699        assert_eq!(col.value(4), f16::ZERO);
2700        assert!(col.value(4).is_sign_positive());
2701        assert_eq!(col.value(5), f16::NEG_ONE);
2702        assert_eq!(col.value(6), f16::NEG_ZERO);
2703        assert!(col.value(6).is_sign_negative());
2704        assert_eq!(col.value(7), f16_two);
2705    }
2706
2707    #[test]
2708    fn test_read_float16_zeros_file() {
2709        use arrow_array::Float16Array;
2710        let testdata = arrow::util::test_util::parquet_test_data();
2711        // see https://github.com/apache/parquet-testing/pull/40
2712        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2713        let file = File::open(path).unwrap();
2714        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2715
2716        let batch = record_reader.next().unwrap().unwrap();
2717        assert_eq!(batch.num_rows(), 3);
2718        let col = batch
2719            .column(0)
2720            .as_any()
2721            .downcast_ref::<Float16Array>()
2722            .unwrap();
2723
2724        assert_eq!(col.null_count(), 1);
2725        assert!(col.is_null(0));
2726        assert_eq!(col.value(1), f16::ZERO);
2727        assert!(col.value(1).is_sign_positive());
2728        assert!(col.value(2).is_nan());
2729    }
2730
2731    #[test]
2732    fn test_read_float32_float64_byte_stream_split() {
2733        let path = format!(
2734            "{}/byte_stream_split.zstd.parquet",
2735            arrow::util::test_util::parquet_test_data(),
2736        );
2737        let file = File::open(path).unwrap();
2738        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2739
2740        let mut row_count = 0;
2741        for batch in record_reader {
2742            let batch = batch.unwrap();
2743            row_count += batch.num_rows();
2744            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2745            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2746
2747            // This file contains floats from a standard normal distribution
2748            for &x in f32_col.values() {
2749                assert!(x > -10.0);
2750                assert!(x < 10.0);
2751            }
2752            for &x in f64_col.values() {
2753                assert!(x > -10.0);
2754                assert!(x < 10.0);
2755            }
2756        }
2757        assert_eq!(row_count, 300);
2758    }
2759
2760    #[test]
2761    fn test_read_extended_byte_stream_split() {
2762        let path = format!(
2763            "{}/byte_stream_split_extended.gzip.parquet",
2764            arrow::util::test_util::parquet_test_data(),
2765        );
2766        let file = File::open(path).unwrap();
2767        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2768
2769        let mut row_count = 0;
2770        for batch in record_reader {
2771            let batch = batch.unwrap();
2772            row_count += batch.num_rows();
2773
2774            // 0,1 are f16
2775            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2776            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2777            assert_eq!(f16_col.len(), f16_bss.len());
2778            f16_col
2779                .iter()
2780                .zip(f16_bss.iter())
2781                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2782
2783            // 2,3 are f32
2784            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2785            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2786            assert_eq!(f32_col.len(), f32_bss.len());
2787            f32_col
2788                .iter()
2789                .zip(f32_bss.iter())
2790                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2791
2792            // 4,5 are f64
2793            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2794            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2795            assert_eq!(f64_col.len(), f64_bss.len());
2796            f64_col
2797                .iter()
2798                .zip(f64_bss.iter())
2799                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2800
2801            // 6,7 are i32
2802            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2803            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2804            assert_eq!(i32_col.len(), i32_bss.len());
2805            i32_col
2806                .iter()
2807                .zip(i32_bss.iter())
2808                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2809
2810            // 8,9 are i64
2811            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2812            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2813            assert_eq!(i64_col.len(), i64_bss.len());
2814            i64_col
2815                .iter()
2816                .zip(i64_bss.iter())
2817                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2818
2819            // 10,11 are FLBA(5)
2820            let flba_col = batch.column(10).as_fixed_size_binary();
2821            let flba_bss = batch.column(11).as_fixed_size_binary();
2822            assert_eq!(flba_col.len(), flba_bss.len());
2823            flba_col
2824                .iter()
2825                .zip(flba_bss.iter())
2826                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2827
2828            // 12,13 are FLBA(4) (decimal(7,3))
2829            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2830            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2831            assert_eq!(dec_col.len(), dec_bss.len());
2832            dec_col
2833                .iter()
2834                .zip(dec_bss.iter())
2835                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2836        }
2837        assert_eq!(row_count, 200);
2838    }
2839
2840    #[test]
2841    fn test_read_incorrect_map_schema_file() {
2842        let testdata = arrow::util::test_util::parquet_test_data();
2843        // see https://github.com/apache/parquet-testing/pull/47
2844        let path = format!("{testdata}/incorrect_map_schema.parquet");
2845        let file = File::open(path).unwrap();
2846        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2847
2848        let batch = record_reader.next().unwrap().unwrap();
2849        assert_eq!(batch.num_rows(), 1);
2850
2851        let expected_schema = Schema::new(vec![Field::new(
2852            "my_map",
2853            ArrowDataType::Map(
2854                Arc::new(Field::new(
2855                    "key_value",
2856                    ArrowDataType::Struct(Fields::from(vec![
2857                        Field::new("key", ArrowDataType::Utf8, false),
2858                        Field::new("value", ArrowDataType::Utf8, true),
2859                    ])),
2860                    false,
2861                )),
2862                false,
2863            ),
2864            true,
2865        )]);
2866        assert_eq!(batch.schema().as_ref(), &expected_schema);
2867
2868        assert_eq!(batch.num_rows(), 1);
2869        assert_eq!(batch.column(0).null_count(), 0);
2870        assert_eq!(
2871            batch.column(0).as_map().keys().as_ref(),
2872            &StringArray::from(vec!["parent", "name"])
2873        );
2874        assert_eq!(
2875            batch.column(0).as_map().values().as_ref(),
2876            &StringArray::from(vec!["another", "report"])
2877        );
2878    }
2879
2880    #[test]
2881    fn test_read_dict_fixed_size_binary() {
2882        let schema = Arc::new(Schema::new(vec![Field::new(
2883            "a",
2884            ArrowDataType::Dictionary(
2885                Box::new(ArrowDataType::UInt8),
2886                Box::new(ArrowDataType::FixedSizeBinary(8)),
2887            ),
2888            true,
2889        )]));
2890        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2891        let values = FixedSizeBinaryArray::try_from_iter(
2892            vec![
2893                (0u8..8u8).collect::<Vec<u8>>(),
2894                (24u8..32u8).collect::<Vec<u8>>(),
2895            ]
2896            .into_iter(),
2897        )
2898        .unwrap();
2899        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2900        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2901
2902        let mut buffer = Vec::with_capacity(1024);
2903        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2904        writer.write(&batch).unwrap();
2905        writer.close().unwrap();
2906        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2907            .unwrap()
2908            .collect::<Result<Vec<_>, _>>()
2909            .unwrap();
2910
2911        assert_eq!(read.len(), 1);
2912        assert_eq!(&batch, &read[0])
2913    }
2914
2915    #[test]
2916    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2917        // the `StructArrayReader` will check the definition and repetition levels of the first
2918        // child column in the struct to determine nullability for the struct. If the first
2919        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2920        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2921        // buffers managed by this array reader.
2922
2923        let struct_fields = Fields::from(vec![
2924            Field::new(
2925                "city",
2926                ArrowDataType::Dictionary(
2927                    Box::new(ArrowDataType::UInt8),
2928                    Box::new(ArrowDataType::Utf8),
2929                ),
2930                true,
2931            ),
2932            Field::new("name", ArrowDataType::Utf8, true),
2933        ]);
2934        let schema = Arc::new(Schema::new(vec![Field::new(
2935            "items",
2936            ArrowDataType::Struct(struct_fields.clone()),
2937            true,
2938        )]));
2939
2940        let items_arr = StructArray::new(
2941            struct_fields,
2942            vec![
2943                Arc::new(DictionaryArray::new(
2944                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2945                    Arc::new(StringArray::from_iter_values(vec![
2946                        "quebec",
2947                        "fredericton",
2948                        "halifax",
2949                    ])),
2950                )),
2951                Arc::new(StringArray::from_iter_values(vec![
2952                    "albert", "terry", "lance", "", "tim",
2953                ])),
2954            ],
2955            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2956        );
2957
2958        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2959        let mut buffer = Vec::with_capacity(1024);
2960        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2961        writer.write(&batch).unwrap();
2962        writer.close().unwrap();
2963        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2964            .unwrap()
2965            .collect::<Result<Vec<_>, _>>()
2966            .unwrap();
2967
2968        assert_eq!(read.len(), 1);
2969        assert_eq!(&batch, &read[0])
2970    }
2971
2972    /// Parameters for single_column_reader_test
2973    #[derive(Clone)]
2974    struct TestOptions {
2975        /// Number of row group to write to parquet (row group size =
2976        /// num_row_groups / num_rows)
2977        num_row_groups: usize,
2978        /// Total number of rows per row group
2979        num_rows: usize,
2980        /// Size of batches to read back
2981        record_batch_size: usize,
2982        /// Percentage of nulls in column or None if required
2983        null_percent: Option<usize>,
2984        /// Set write batch size
2985        ///
2986        /// This is the number of rows that are written at once to a page and
2987        /// therefore acts as a bound on the page granularity of a row group
2988        write_batch_size: usize,
2989        /// Maximum size of page in bytes
2990        max_data_page_size: usize,
2991        /// Maximum size of dictionary page in bytes
2992        max_dict_page_size: usize,
2993        /// Writer version
2994        writer_version: WriterVersion,
2995        /// Enabled statistics
2996        enabled_statistics: EnabledStatistics,
2997        /// Encoding
2998        encoding: Encoding,
2999        /// row selections and total selected row count
3000        row_selections: Option<(RowSelection, usize)>,
3001        /// row filter
3002        row_filter: Option<Vec<bool>>,
3003        /// limit
3004        limit: Option<usize>,
3005        /// offset
3006        offset: Option<usize>,
3007    }
3008
3009    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
3010    impl std::fmt::Debug for TestOptions {
3011        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3012            f.debug_struct("TestOptions")
3013                .field("num_row_groups", &self.num_row_groups)
3014                .field("num_rows", &self.num_rows)
3015                .field("record_batch_size", &self.record_batch_size)
3016                .field("null_percent", &self.null_percent)
3017                .field("write_batch_size", &self.write_batch_size)
3018                .field("max_data_page_size", &self.max_data_page_size)
3019                .field("max_dict_page_size", &self.max_dict_page_size)
3020                .field("writer_version", &self.writer_version)
3021                .field("enabled_statistics", &self.enabled_statistics)
3022                .field("encoding", &self.encoding)
3023                .field("row_selections", &self.row_selections.is_some())
3024                .field("row_filter", &self.row_filter.is_some())
3025                .field("limit", &self.limit)
3026                .field("offset", &self.offset)
3027                .finish()
3028        }
3029    }
3030
3031    impl Default for TestOptions {
3032        fn default() -> Self {
3033            Self {
3034                num_row_groups: 2,
3035                num_rows: 100,
3036                record_batch_size: 15,
3037                null_percent: None,
3038                write_batch_size: 64,
3039                max_data_page_size: 1024 * 1024,
3040                max_dict_page_size: 1024 * 1024,
3041                writer_version: WriterVersion::PARQUET_1_0,
3042                enabled_statistics: EnabledStatistics::Page,
3043                encoding: Encoding::PLAIN,
3044                row_selections: None,
3045                row_filter: None,
3046                limit: None,
3047                offset: None,
3048            }
3049        }
3050    }
3051
3052    impl TestOptions {
3053        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3054            Self {
3055                num_row_groups,
3056                num_rows,
3057                record_batch_size,
3058                ..Default::default()
3059            }
3060        }
3061
3062        fn with_null_percent(self, null_percent: usize) -> Self {
3063            Self {
3064                null_percent: Some(null_percent),
3065                ..self
3066            }
3067        }
3068
3069        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3070            Self {
3071                max_data_page_size,
3072                ..self
3073            }
3074        }
3075
3076        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3077            Self {
3078                max_dict_page_size,
3079                ..self
3080            }
3081        }
3082
3083        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3084            Self {
3085                enabled_statistics,
3086                ..self
3087            }
3088        }
3089
3090        fn with_row_selections(self) -> Self {
3091            assert!(self.row_filter.is_none(), "Must set row selection first");
3092
3093            let mut rng = rng();
3094            let step = rng.random_range(self.record_batch_size..self.num_rows);
3095            let row_selections = create_test_selection(
3096                step,
3097                self.num_row_groups * self.num_rows,
3098                rng.random::<bool>(),
3099            );
3100            Self {
3101                row_selections: Some(row_selections),
3102                ..self
3103            }
3104        }
3105
3106        fn with_row_filter(self) -> Self {
3107            let row_count = match &self.row_selections {
3108                Some((_, count)) => *count,
3109                None => self.num_row_groups * self.num_rows,
3110            };
3111
3112            let mut rng = rng();
3113            Self {
3114                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3115                ..self
3116            }
3117        }
3118
3119        fn with_limit(self, limit: usize) -> Self {
3120            Self {
3121                limit: Some(limit),
3122                ..self
3123            }
3124        }
3125
3126        fn with_offset(self, offset: usize) -> Self {
3127            Self {
3128                offset: Some(offset),
3129                ..self
3130            }
3131        }
3132
3133        fn writer_props(&self) -> WriterProperties {
3134            let builder = WriterProperties::builder()
3135                .set_data_page_size_limit(self.max_data_page_size)
3136                .set_write_batch_size(self.write_batch_size)
3137                .set_writer_version(self.writer_version)
3138                .set_statistics_enabled(self.enabled_statistics);
3139
3140            let builder = match self.encoding {
3141                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3142                    .set_dictionary_enabled(true)
3143                    .set_dictionary_page_size_limit(self.max_dict_page_size),
3144                _ => builder
3145                    .set_dictionary_enabled(false)
3146                    .set_encoding(self.encoding),
3147            };
3148
3149            builder.build()
3150        }
3151    }
3152
3153    /// Create a parquet file and then read it using
3154    /// `ParquetFileArrowReader` using a standard set of parameters
3155    /// `opts`.
3156    ///
3157    /// `rand_max` represents the maximum size of value to pass to to
3158    /// value generator
3159    fn run_single_column_reader_tests<T, F, G>(
3160        rand_max: i32,
3161        converted_type: ConvertedType,
3162        arrow_type: Option<ArrowDataType>,
3163        converter: F,
3164        encodings: &[Encoding],
3165    ) where
3166        T: DataType,
3167        G: RandGen<T>,
3168        F: Fn(&[Option<T::T>]) -> ArrayRef,
3169    {
3170        let all_options = vec![
3171            // choose record_batch_batch (15) so batches cross row
3172            // group boundaries (50 rows in 2 row groups) cases.
3173            TestOptions::new(2, 100, 15),
3174            // choose record_batch_batch (5) so batches sometime fall
3175            // on row group boundaries and (25 rows in 3 row groups
3176            // --> row groups of 10, 10, and 5). Tests buffer
3177            // refilling edge cases.
3178            TestOptions::new(3, 25, 5),
3179            // Choose record_batch_size (25) so all batches fall
3180            // exactly on row group boundary (25). Tests buffer
3181            // refilling edge cases.
3182            TestOptions::new(4, 100, 25),
3183            // Set maximum page size so row groups have multiple pages
3184            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3185            // Set small dictionary page size to test dictionary fallback
3186            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3187            // Test optional but with no nulls
3188            TestOptions::new(2, 256, 127).with_null_percent(0),
3189            // Test optional with nulls
3190            TestOptions::new(2, 256, 93).with_null_percent(25),
3191            // Test with limit of 0
3192            TestOptions::new(4, 100, 25).with_limit(0),
3193            // Test with limit of 50
3194            TestOptions::new(4, 100, 25).with_limit(50),
3195            // Test with limit equal to number of rows
3196            TestOptions::new(4, 100, 25).with_limit(10),
3197            // Test with limit larger than number of rows
3198            TestOptions::new(4, 100, 25).with_limit(101),
3199            // Test with limit + offset equal to number of rows
3200            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3201            // Test with limit + offset equal to number of rows
3202            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3203            // Test with limit + offset larger than number of rows
3204            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3205            // Test with no page-level statistics
3206            TestOptions::new(2, 256, 91)
3207                .with_null_percent(25)
3208                .with_enabled_statistics(EnabledStatistics::Chunk),
3209            // Test with no statistics
3210            TestOptions::new(2, 256, 91)
3211                .with_null_percent(25)
3212                .with_enabled_statistics(EnabledStatistics::None),
3213            // Test with all null
3214            TestOptions::new(2, 128, 91)
3215                .with_null_percent(100)
3216                .with_enabled_statistics(EnabledStatistics::None),
3217            // Test skip
3218
3219            // choose record_batch_batch (15) so batches cross row
3220            // group boundaries (50 rows in 2 row groups) cases.
3221            TestOptions::new(2, 100, 15).with_row_selections(),
3222            // choose record_batch_batch (5) so batches sometime fall
3223            // on row group boundaries and (25 rows in 3 row groups
3224            // --> row groups of 10, 10, and 5). Tests buffer
3225            // refilling edge cases.
3226            TestOptions::new(3, 25, 5).with_row_selections(),
3227            // Choose record_batch_size (25) so all batches fall
3228            // exactly on row group boundary (25). Tests buffer
3229            // refilling edge cases.
3230            TestOptions::new(4, 100, 25).with_row_selections(),
3231            // Set maximum page size so row groups have multiple pages
3232            TestOptions::new(3, 256, 73)
3233                .with_max_data_page_size(128)
3234                .with_row_selections(),
3235            // Set small dictionary page size to test dictionary fallback
3236            TestOptions::new(3, 256, 57)
3237                .with_max_dict_page_size(128)
3238                .with_row_selections(),
3239            // Test optional but with no nulls
3240            TestOptions::new(2, 256, 127)
3241                .with_null_percent(0)
3242                .with_row_selections(),
3243            // Test optional with nulls
3244            TestOptions::new(2, 256, 93)
3245                .with_null_percent(25)
3246                .with_row_selections(),
3247            // Test optional with nulls
3248            TestOptions::new(2, 256, 93)
3249                .with_null_percent(25)
3250                .with_row_selections()
3251                .with_limit(10),
3252            // Test optional with nulls
3253            TestOptions::new(2, 256, 93)
3254                .with_null_percent(25)
3255                .with_row_selections()
3256                .with_offset(20)
3257                .with_limit(10),
3258            // Test filter
3259
3260            // Test with row filter
3261            TestOptions::new(4, 100, 25).with_row_filter(),
3262            // Test with row selection and row filter
3263            TestOptions::new(4, 100, 25)
3264                .with_row_selections()
3265                .with_row_filter(),
3266            // Test with nulls and row filter
3267            TestOptions::new(2, 256, 93)
3268                .with_null_percent(25)
3269                .with_max_data_page_size(10)
3270                .with_row_filter(),
3271            // Test with nulls and row filter and small pages
3272            TestOptions::new(2, 256, 93)
3273                .with_null_percent(25)
3274                .with_max_data_page_size(10)
3275                .with_row_selections()
3276                .with_row_filter(),
3277            // Test with row selection and no offset index and small pages
3278            TestOptions::new(2, 256, 93)
3279                .with_enabled_statistics(EnabledStatistics::None)
3280                .with_max_data_page_size(10)
3281                .with_row_selections(),
3282        ];
3283
3284        all_options.into_iter().for_each(|opts| {
3285            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3286                for encoding in encodings {
3287                    let opts = TestOptions {
3288                        writer_version,
3289                        encoding: *encoding,
3290                        ..opts.clone()
3291                    };
3292
3293                    single_column_reader_test::<T, _, G>(
3294                        opts,
3295                        rand_max,
3296                        converted_type,
3297                        arrow_type.clone(),
3298                        &converter,
3299                    )
3300                }
3301            }
3302        });
3303    }
3304
3305    /// Create a parquet file and then read it using
3306    /// `ParquetFileArrowReader` using the parameters described in
3307    /// `opts`.
3308    fn single_column_reader_test<T, F, G>(
3309        opts: TestOptions,
3310        rand_max: i32,
3311        converted_type: ConvertedType,
3312        arrow_type: Option<ArrowDataType>,
3313        converter: F,
3314    ) where
3315        T: DataType,
3316        G: RandGen<T>,
3317        F: Fn(&[Option<T::T>]) -> ArrayRef,
3318    {
3319        // Print out options to facilitate debugging failures on CI
3320        println!(
3321            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3322            T::get_physical_type(),
3323            converted_type,
3324            arrow_type,
3325            opts
3326        );
3327
3328        //according to null_percent generate def_levels
3329        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3330            Some(null_percent) => {
3331                let mut rng = rng();
3332
3333                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3334                    .map(|_| {
3335                        std::iter::from_fn(|| {
3336                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3337                        })
3338                        .take(opts.num_rows)
3339                        .collect()
3340                    })
3341                    .collect();
3342                (Repetition::OPTIONAL, Some(def_levels))
3343            }
3344            None => (Repetition::REQUIRED, None),
3345        };
3346
3347        //generate random table data
3348        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3349            .map(|idx| {
3350                let null_count = match def_levels.as_ref() {
3351                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3352                    None => 0,
3353                };
3354                G::gen_vec(rand_max, opts.num_rows - null_count)
3355            })
3356            .collect();
3357
3358        let len = match T::get_physical_type() {
3359            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3360            crate::basic::Type::INT96 => 12,
3361            _ => -1,
3362        };
3363
3364        let fields = vec![Arc::new(
3365            Type::primitive_type_builder("leaf", T::get_physical_type())
3366                .with_repetition(repetition)
3367                .with_converted_type(converted_type)
3368                .with_length(len)
3369                .build()
3370                .unwrap(),
3371        )];
3372
3373        let schema = Arc::new(
3374            Type::group_type_builder("test_schema")
3375                .with_fields(fields)
3376                .build()
3377                .unwrap(),
3378        );
3379
3380        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3381
3382        let mut file = tempfile::tempfile().unwrap();
3383
3384        generate_single_column_file_with_data::<T>(
3385            &values,
3386            def_levels.as_ref(),
3387            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3388            schema,
3389            arrow_field,
3390            &opts,
3391        )
3392        .unwrap();
3393
3394        file.rewind().unwrap();
3395
3396        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3397            opts.enabled_statistics == EnabledStatistics::Page,
3398        ));
3399
3400        let mut builder =
3401            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3402
3403        let expected_data = match opts.row_selections {
3404            Some((selections, row_count)) => {
3405                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3406
3407                let mut skip_data: Vec<Option<T::T>> = vec![];
3408                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3409                for select in dequeue {
3410                    if select.skip {
3411                        without_skip_data.drain(0..select.row_count);
3412                    } else {
3413                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3414                    }
3415                }
3416                builder = builder.with_row_selection(selections);
3417
3418                assert_eq!(skip_data.len(), row_count);
3419                skip_data
3420            }
3421            None => {
3422                //get flatten table data
3423                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3424                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3425                expected_data
3426            }
3427        };
3428
3429        let mut expected_data = match opts.row_filter {
3430            Some(filter) => {
3431                let expected_data = expected_data
3432                    .into_iter()
3433                    .zip(filter.iter())
3434                    .filter_map(|(d, f)| f.then(|| d))
3435                    .collect();
3436
3437                let mut filter_offset = 0;
3438                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3439                    ProjectionMask::all(),
3440                    move |b| {
3441                        let array = BooleanArray::from_iter(
3442                            filter
3443                                .iter()
3444                                .skip(filter_offset)
3445                                .take(b.num_rows())
3446                                .map(|x| Some(*x)),
3447                        );
3448                        filter_offset += b.num_rows();
3449                        Ok(array)
3450                    },
3451                ))]);
3452
3453                builder = builder.with_row_filter(filter);
3454                expected_data
3455            }
3456            None => expected_data,
3457        };
3458
3459        if let Some(offset) = opts.offset {
3460            builder = builder.with_offset(offset);
3461            expected_data = expected_data.into_iter().skip(offset).collect();
3462        }
3463
3464        if let Some(limit) = opts.limit {
3465            builder = builder.with_limit(limit);
3466            expected_data = expected_data.into_iter().take(limit).collect();
3467        }
3468
3469        let mut record_reader = builder
3470            .with_batch_size(opts.record_batch_size)
3471            .build()
3472            .unwrap();
3473
3474        let mut total_read = 0;
3475        loop {
3476            let maybe_batch = record_reader.next();
3477            if total_read < expected_data.len() {
3478                let end = min(total_read + opts.record_batch_size, expected_data.len());
3479                let batch = maybe_batch.unwrap().unwrap();
3480                assert_eq!(end - total_read, batch.num_rows());
3481
3482                let a = converter(&expected_data[total_read..end]);
3483                let b = batch.column(0);
3484
3485                assert_eq!(a.data_type(), b.data_type());
3486                assert_eq!(a.to_data(), b.to_data());
3487                assert_eq!(
3488                    a.as_any().type_id(),
3489                    b.as_any().type_id(),
3490                    "incorrect type ids"
3491                );
3492
3493                total_read = end;
3494            } else {
3495                assert!(maybe_batch.is_none());
3496                break;
3497            }
3498        }
3499    }
3500
3501    fn gen_expected_data<T: DataType>(
3502        def_levels: Option<&Vec<Vec<i16>>>,
3503        values: &[Vec<T::T>],
3504    ) -> Vec<Option<T::T>> {
3505        let data: Vec<Option<T::T>> = match def_levels {
3506            Some(levels) => {
3507                let mut values_iter = values.iter().flatten();
3508                levels
3509                    .iter()
3510                    .flatten()
3511                    .map(|d| match d {
3512                        1 => Some(values_iter.next().cloned().unwrap()),
3513                        0 => None,
3514                        _ => unreachable!(),
3515                    })
3516                    .collect()
3517            }
3518            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3519        };
3520        data
3521    }
3522
3523    fn generate_single_column_file_with_data<T: DataType>(
3524        values: &[Vec<T::T>],
3525        def_levels: Option<&Vec<Vec<i16>>>,
3526        file: File,
3527        schema: TypePtr,
3528        field: Option<Field>,
3529        opts: &TestOptions,
3530    ) -> Result<ParquetMetaData> {
3531        let mut writer_props = opts.writer_props();
3532        if let Some(field) = field {
3533            let arrow_schema = Schema::new(vec![field]);
3534            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3535        }
3536
3537        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3538
3539        for (idx, v) in values.iter().enumerate() {
3540            let def_levels = def_levels.map(|d| d[idx].as_slice());
3541            let mut row_group_writer = writer.next_row_group()?;
3542            {
3543                let mut column_writer = row_group_writer
3544                    .next_column()?
3545                    .expect("Column writer is none!");
3546
3547                column_writer
3548                    .typed::<T>()
3549                    .write_batch(v, def_levels, None)?;
3550
3551                column_writer.close()?;
3552            }
3553            row_group_writer.close()?;
3554        }
3555
3556        writer.close()
3557    }
3558
3559    fn get_test_file(file_name: &str) -> File {
3560        let mut path = PathBuf::new();
3561        path.push(arrow::util::test_util::arrow_test_data());
3562        path.push(file_name);
3563
3564        File::open(path.as_path()).expect("File not found!")
3565    }
3566
3567    #[test]
3568    fn test_read_structs() {
3569        // This particular test file has columns of struct types where there is
3570        // a column that has the same name as one of the struct fields
3571        // (see: ARROW-11452)
3572        let testdata = arrow::util::test_util::parquet_test_data();
3573        let path = format!("{testdata}/nested_structs.rust.parquet");
3574        let file = File::open(&path).unwrap();
3575        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3576
3577        for batch in record_batch_reader {
3578            batch.unwrap();
3579        }
3580
3581        let file = File::open(&path).unwrap();
3582        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3583
3584        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3585        let projected_reader = builder
3586            .with_projection(mask)
3587            .with_batch_size(60)
3588            .build()
3589            .unwrap();
3590
3591        let expected_schema = Schema::new(vec![
3592            Field::new(
3593                "roll_num",
3594                ArrowDataType::Struct(Fields::from(vec![Field::new(
3595                    "count",
3596                    ArrowDataType::UInt64,
3597                    false,
3598                )])),
3599                false,
3600            ),
3601            Field::new(
3602                "PC_CUR",
3603                ArrowDataType::Struct(Fields::from(vec![
3604                    Field::new("mean", ArrowDataType::Int64, false),
3605                    Field::new("sum", ArrowDataType::Int64, false),
3606                ])),
3607                false,
3608            ),
3609        ]);
3610
3611        // Tests for #1652 and #1654
3612        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3613
3614        for batch in projected_reader {
3615            let batch = batch.unwrap();
3616            assert_eq!(batch.schema().as_ref(), &expected_schema);
3617        }
3618    }
3619
3620    #[test]
3621    // same as test_read_structs but constructs projection mask via column names
3622    fn test_read_structs_by_name() {
3623        let testdata = arrow::util::test_util::parquet_test_data();
3624        let path = format!("{testdata}/nested_structs.rust.parquet");
3625        let file = File::open(&path).unwrap();
3626        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3627
3628        for batch in record_batch_reader {
3629            batch.unwrap();
3630        }
3631
3632        let file = File::open(&path).unwrap();
3633        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3634
3635        let mask = ProjectionMask::columns(
3636            builder.parquet_schema(),
3637            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3638        );
3639        let projected_reader = builder
3640            .with_projection(mask)
3641            .with_batch_size(60)
3642            .build()
3643            .unwrap();
3644
3645        let expected_schema = Schema::new(vec![
3646            Field::new(
3647                "roll_num",
3648                ArrowDataType::Struct(Fields::from(vec![Field::new(
3649                    "count",
3650                    ArrowDataType::UInt64,
3651                    false,
3652                )])),
3653                false,
3654            ),
3655            Field::new(
3656                "PC_CUR",
3657                ArrowDataType::Struct(Fields::from(vec![
3658                    Field::new("mean", ArrowDataType::Int64, false),
3659                    Field::new("sum", ArrowDataType::Int64, false),
3660                ])),
3661                false,
3662            ),
3663        ]);
3664
3665        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3666
3667        for batch in projected_reader {
3668            let batch = batch.unwrap();
3669            assert_eq!(batch.schema().as_ref(), &expected_schema);
3670        }
3671    }
3672
3673    #[test]
3674    fn test_read_maps() {
3675        let testdata = arrow::util::test_util::parquet_test_data();
3676        let path = format!("{testdata}/nested_maps.snappy.parquet");
3677        let file = File::open(path).unwrap();
3678        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3679
3680        for batch in record_batch_reader {
3681            batch.unwrap();
3682        }
3683    }
3684
3685    #[test]
3686    fn test_nested_nullability() {
3687        let message_type = "message nested {
3688          OPTIONAL Group group {
3689            REQUIRED INT32 leaf;
3690          }
3691        }";
3692
3693        let file = tempfile::tempfile().unwrap();
3694        let schema = Arc::new(parse_message_type(message_type).unwrap());
3695
3696        {
3697            // Write using low-level parquet API (#1167)
3698            let mut writer =
3699                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3700                    .unwrap();
3701
3702            {
3703                let mut row_group_writer = writer.next_row_group().unwrap();
3704                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3705
3706                column_writer
3707                    .typed::<Int32Type>()
3708                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3709                    .unwrap();
3710
3711                column_writer.close().unwrap();
3712                row_group_writer.close().unwrap();
3713            }
3714
3715            writer.close().unwrap();
3716        }
3717
3718        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3719        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3720
3721        let reader = builder.with_projection(mask).build().unwrap();
3722
3723        let expected_schema = Schema::new(vec![Field::new(
3724            "group",
3725            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3726            true,
3727        )]);
3728
3729        let batch = reader.into_iter().next().unwrap().unwrap();
3730        assert_eq!(batch.schema().as_ref(), &expected_schema);
3731        assert_eq!(batch.num_rows(), 4);
3732        assert_eq!(batch.column(0).null_count(), 2);
3733    }
3734
3735    #[test]
3736    fn test_invalid_utf8() {
3737        // a parquet file with 1 column with invalid utf8
3738        let data = vec![
3739            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3740            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3741            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3742            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3743            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3744            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3745            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3746            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3747            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3748            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3749            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3750            82, 49,
3751        ];
3752
3753        let file = Bytes::from(data);
3754        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3755
3756        let error = record_batch_reader.next().unwrap().unwrap_err();
3757
3758        assert!(
3759            error.to_string().contains("invalid utf-8 sequence"),
3760            "{}",
3761            error
3762        );
3763    }
3764
3765    #[test]
3766    fn test_invalid_utf8_string_array() {
3767        test_invalid_utf8_string_array_inner::<i32>();
3768    }
3769
3770    #[test]
3771    fn test_invalid_utf8_large_string_array() {
3772        test_invalid_utf8_string_array_inner::<i64>();
3773    }
3774
3775    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3776        let cases = [
3777            invalid_utf8_first_char::<O>(),
3778            invalid_utf8_first_char_long_strings::<O>(),
3779            invalid_utf8_later_char::<O>(),
3780            invalid_utf8_later_char_long_strings::<O>(),
3781            invalid_utf8_later_char_really_long_strings::<O>(),
3782            invalid_utf8_later_char_really_long_strings2::<O>(),
3783        ];
3784        for array in &cases {
3785            for encoding in STRING_ENCODINGS {
3786                // data is not valid utf8 we can not construct a correct StringArray
3787                // safely, so purposely create an invalid StringArray
3788                let array = unsafe {
3789                    GenericStringArray::<O>::new_unchecked(
3790                        array.offsets().clone(),
3791                        array.values().clone(),
3792                        array.nulls().cloned(),
3793                    )
3794                };
3795                let data_type = array.data_type().clone();
3796                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3797                let err = read_from_parquet(data).unwrap_err();
3798                let expected_err =
3799                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3800                assert!(
3801                    err.to_string().contains(expected_err),
3802                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3803                );
3804            }
3805        }
3806    }
3807
3808    #[test]
3809    fn test_invalid_utf8_string_view_array() {
3810        let cases = [
3811            invalid_utf8_first_char::<i32>(),
3812            invalid_utf8_first_char_long_strings::<i32>(),
3813            invalid_utf8_later_char::<i32>(),
3814            invalid_utf8_later_char_long_strings::<i32>(),
3815            invalid_utf8_later_char_really_long_strings::<i32>(),
3816            invalid_utf8_later_char_really_long_strings2::<i32>(),
3817        ];
3818
3819        for encoding in STRING_ENCODINGS {
3820            for array in &cases {
3821                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3822                let array = array.as_binary_view();
3823
3824                // data is not valid utf8 we can not construct a correct StringArray
3825                // safely, so purposely create an invalid StringViewArray
3826                let array = unsafe {
3827                    StringViewArray::new_unchecked(
3828                        array.views().clone(),
3829                        array.data_buffers().to_vec(),
3830                        array.nulls().cloned(),
3831                    )
3832                };
3833
3834                let data_type = array.data_type().clone();
3835                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3836                let err = read_from_parquet(data).unwrap_err();
3837                let expected_err =
3838                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3839                assert!(
3840                    err.to_string().contains(expected_err),
3841                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3842                );
3843            }
3844        }
3845    }
3846
3847    /// Encodings suitable for string data
3848    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3849        None,
3850        Some(Encoding::PLAIN),
3851        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3852        Some(Encoding::DELTA_BYTE_ARRAY),
3853    ];
3854
3855    /// Invalid Utf-8 sequence in the first character
3856    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3857    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3858
3859    /// Invalid Utf=8 sequence in NOT the first character
3860    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3861    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3862
3863    /// returns a BinaryArray with invalid UTF8 data in the first character
3864    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3865        let valid: &[u8] = b"   ";
3866        let invalid = INVALID_UTF8_FIRST_CHAR;
3867        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3868    }
3869
3870    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3871    /// string larger than 12 bytes which is handled specially when reading
3872    /// `ByteViewArray`s
3873    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3874        let valid: &[u8] = b"   ";
3875        let mut invalid = vec![];
3876        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3877        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3878        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3879    }
3880
3881    /// returns a BinaryArray with invalid UTF8 data in a character other than
3882    /// the first (this is checked in a special codepath)
3883    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3884        let valid: &[u8] = b"   ";
3885        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3886        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3887    }
3888
3889    /// returns a BinaryArray with invalid UTF8 data in a character other than
3890    /// the first in a string larger than 12 bytes which is handled specially
3891    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3892    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3893        let valid: &[u8] = b"   ";
3894        let mut invalid = vec![];
3895        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3896        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3897        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3898    }
3899
3900    /// returns a BinaryArray with invalid UTF8 data in a character other than
3901    /// the first in a string larger than 128 bytes which is handled specially
3902    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3903    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3904        let valid: &[u8] = b"   ";
3905        let mut invalid = vec![];
3906        for _ in 0..10 {
3907            // each instance is 38 bytes
3908            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3909        }
3910        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3911        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3912    }
3913
3914    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3915    /// invalid UTF8 data in a character other than the first in a string larger
3916    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3917        let valid: &[u8] = b"   ";
3918        let mut valid_long = vec![];
3919        for _ in 0..10 {
3920            // each instance is 38 bytes
3921            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3922        }
3923        let invalid = INVALID_UTF8_LATER_CHAR;
3924        GenericBinaryArray::<O>::from_iter(vec![
3925            None,
3926            Some(valid),
3927            Some(invalid),
3928            None,
3929            Some(&valid_long),
3930            Some(valid),
3931        ])
3932    }
3933
3934    /// writes the array into a single column parquet file with the specified
3935    /// encoding.
3936    ///
3937    /// If no encoding is specified, use default (dictionary) encoding
3938    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3939        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3940        let mut data = vec![];
3941        let schema = batch.schema();
3942        let props = encoding.map(|encoding| {
3943            WriterProperties::builder()
3944                // must disable dictionary encoding to actually use encoding
3945                .set_dictionary_enabled(false)
3946                .set_encoding(encoding)
3947                .build()
3948        });
3949
3950        {
3951            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3952            writer.write(&batch).unwrap();
3953            writer.flush().unwrap();
3954            writer.close().unwrap();
3955        };
3956        data
3957    }
3958
3959    /// read the parquet file into a record batch
3960    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3961        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3962            .unwrap()
3963            .build()
3964            .unwrap();
3965
3966        reader.collect()
3967    }
3968
3969    #[test]
3970    fn test_dictionary_preservation() {
3971        let fields = vec![Arc::new(
3972            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3973                .with_repetition(Repetition::OPTIONAL)
3974                .with_converted_type(ConvertedType::UTF8)
3975                .build()
3976                .unwrap(),
3977        )];
3978
3979        let schema = Arc::new(
3980            Type::group_type_builder("test_schema")
3981                .with_fields(fields)
3982                .build()
3983                .unwrap(),
3984        );
3985
3986        let dict_type = ArrowDataType::Dictionary(
3987            Box::new(ArrowDataType::Int32),
3988            Box::new(ArrowDataType::Utf8),
3989        );
3990
3991        let arrow_field = Field::new("leaf", dict_type, true);
3992
3993        let mut file = tempfile::tempfile().unwrap();
3994
3995        let values = vec![
3996            vec![
3997                ByteArray::from("hello"),
3998                ByteArray::from("a"),
3999                ByteArray::from("b"),
4000                ByteArray::from("d"),
4001            ],
4002            vec![
4003                ByteArray::from("c"),
4004                ByteArray::from("a"),
4005                ByteArray::from("b"),
4006            ],
4007        ];
4008
4009        let def_levels = vec![
4010            vec![1, 0, 0, 1, 0, 0, 1, 1],
4011            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4012        ];
4013
4014        let opts = TestOptions {
4015            encoding: Encoding::RLE_DICTIONARY,
4016            ..Default::default()
4017        };
4018
4019        generate_single_column_file_with_data::<ByteArrayType>(
4020            &values,
4021            Some(&def_levels),
4022            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
4023            schema,
4024            Some(arrow_field),
4025            &opts,
4026        )
4027        .unwrap();
4028
4029        file.rewind().unwrap();
4030
4031        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4032
4033        let batches = record_reader
4034            .collect::<Result<Vec<RecordBatch>, _>>()
4035            .unwrap();
4036
4037        assert_eq!(batches.len(), 6);
4038        assert!(batches.iter().all(|x| x.num_columns() == 1));
4039
4040        let row_counts = batches
4041            .iter()
4042            .map(|x| (x.num_rows(), x.column(0).null_count()))
4043            .collect::<Vec<_>>();
4044
4045        assert_eq!(
4046            row_counts,
4047            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4048        );
4049
4050        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4051
4052        // First and second batch in same row group -> same dictionary
4053        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4054        // Third batch spans row group -> computed dictionary
4055        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4056        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4057        // Fourth, fifth and sixth from same row group -> same dictionary
4058        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4059        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4060    }
4061
4062    #[test]
4063    fn test_read_null_list() {
4064        let testdata = arrow::util::test_util::parquet_test_data();
4065        let path = format!("{testdata}/null_list.parquet");
4066        let file = File::open(path).unwrap();
4067        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4068
4069        let batch = record_batch_reader.next().unwrap().unwrap();
4070        assert_eq!(batch.num_rows(), 1);
4071        assert_eq!(batch.num_columns(), 1);
4072        assert_eq!(batch.column(0).len(), 1);
4073
4074        let list = batch
4075            .column(0)
4076            .as_any()
4077            .downcast_ref::<ListArray>()
4078            .unwrap();
4079        assert_eq!(list.len(), 1);
4080        assert!(list.is_valid(0));
4081
4082        let val = list.value(0);
4083        assert_eq!(val.len(), 0);
4084    }
4085
4086    #[test]
4087    fn test_null_schema_inference() {
4088        let testdata = arrow::util::test_util::parquet_test_data();
4089        let path = format!("{testdata}/null_list.parquet");
4090        let file = File::open(path).unwrap();
4091
4092        let arrow_field = Field::new(
4093            "emptylist",
4094            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4095            true,
4096        );
4097
4098        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4099        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4100        let schema = builder.schema();
4101        assert_eq!(schema.fields().len(), 1);
4102        assert_eq!(schema.field(0), &arrow_field);
4103    }
4104
4105    #[test]
4106    fn test_skip_metadata() {
4107        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4108        let field = Field::new("col", col.data_type().clone(), true);
4109
4110        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4111
4112        let metadata = [("key".to_string(), "value".to_string())]
4113            .into_iter()
4114            .collect();
4115
4116        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4117
4118        assert_ne!(schema_with_metadata, schema_without_metadata);
4119
4120        let batch =
4121            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4122
4123        let file = |version: WriterVersion| {
4124            let props = WriterProperties::builder()
4125                .set_writer_version(version)
4126                .build();
4127
4128            let file = tempfile().unwrap();
4129            let mut writer =
4130                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4131                    .unwrap();
4132            writer.write(&batch).unwrap();
4133            writer.close().unwrap();
4134            file
4135        };
4136
4137        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4138
4139        let v1_reader = file(WriterVersion::PARQUET_1_0);
4140        let v2_reader = file(WriterVersion::PARQUET_2_0);
4141
4142        let arrow_reader =
4143            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4144        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4145
4146        let reader =
4147            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4148                .unwrap()
4149                .build()
4150                .unwrap();
4151        assert_eq!(reader.schema(), schema_without_metadata);
4152
4153        let arrow_reader =
4154            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4155        assert_eq!(arrow_reader.schema(), schema_with_metadata);
4156
4157        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4158            .unwrap()
4159            .build()
4160            .unwrap();
4161        assert_eq!(reader.schema(), schema_without_metadata);
4162    }
4163
4164    fn write_parquet_from_iter<I, F>(value: I) -> File
4165    where
4166        I: IntoIterator<Item = (F, ArrayRef)>,
4167        F: AsRef<str>,
4168    {
4169        let batch = RecordBatch::try_from_iter(value).unwrap();
4170        let file = tempfile().unwrap();
4171        let mut writer =
4172            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4173        writer.write(&batch).unwrap();
4174        writer.close().unwrap();
4175        file
4176    }
4177
4178    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4179    where
4180        I: IntoIterator<Item = (F, ArrayRef)>,
4181        F: AsRef<str>,
4182    {
4183        let file = write_parquet_from_iter(value);
4184        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4185        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4186            file.try_clone().unwrap(),
4187            options_with_schema,
4188        );
4189        assert_eq!(builder.err().unwrap().to_string(), expected_error);
4190    }
4191
4192    #[test]
4193    fn test_schema_too_few_columns() {
4194        run_schema_test_with_error(
4195            vec![
4196                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4197                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4198            ],
4199            Arc::new(Schema::new(vec![Field::new(
4200                "int64",
4201                ArrowDataType::Int64,
4202                false,
4203            )])),
4204            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4205        );
4206    }
4207
4208    #[test]
4209    fn test_schema_too_many_columns() {
4210        run_schema_test_with_error(
4211            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4212            Arc::new(Schema::new(vec![
4213                Field::new("int64", ArrowDataType::Int64, false),
4214                Field::new("int32", ArrowDataType::Int32, false),
4215            ])),
4216            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4217        );
4218    }
4219
4220    #[test]
4221    fn test_schema_mismatched_column_names() {
4222        run_schema_test_with_error(
4223            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4224            Arc::new(Schema::new(vec![Field::new(
4225                "other",
4226                ArrowDataType::Int64,
4227                false,
4228            )])),
4229            "Arrow: incompatible arrow schema, expected field named int64 got other",
4230        );
4231    }
4232
4233    #[test]
4234    fn test_schema_incompatible_columns() {
4235        run_schema_test_with_error(
4236            vec![
4237                (
4238                    "col1_invalid",
4239                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4240                ),
4241                (
4242                    "col2_valid",
4243                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4244                ),
4245                (
4246                    "col3_invalid",
4247                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4248                ),
4249            ],
4250            Arc::new(Schema::new(vec![
4251                Field::new("col1_invalid", ArrowDataType::Int32, false),
4252                Field::new("col2_valid", ArrowDataType::Int32, false),
4253                Field::new("col3_invalid", ArrowDataType::Int32, false),
4254            ])),
4255            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4256        );
4257    }
4258
4259    #[test]
4260    fn test_one_incompatible_nested_column() {
4261        let nested_fields = Fields::from(vec![
4262            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4263            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4264        ]);
4265        let nested = StructArray::try_new(
4266            nested_fields,
4267            vec![
4268                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4269                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4270            ],
4271            None,
4272        )
4273        .expect("struct array");
4274        let supplied_nested_fields = Fields::from(vec![
4275            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4276            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4277        ]);
4278        run_schema_test_with_error(
4279            vec![
4280                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4281                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4282                ("nested", Arc::new(nested) as ArrayRef),
4283            ],
4284            Arc::new(Schema::new(vec![
4285                Field::new("col1", ArrowDataType::Int64, false),
4286                Field::new("col2", ArrowDataType::Int32, false),
4287                Field::new(
4288                    "nested",
4289                    ArrowDataType::Struct(supplied_nested_fields),
4290                    false,
4291                ),
4292            ])),
4293            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4294            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4295            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4296        );
4297    }
4298
4299    /// Return parquet data with a single column of utf8 strings
4300    fn utf8_parquet() -> Bytes {
4301        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4302        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4303        let props = None;
4304        // write parquet file with non nullable strings
4305        let mut parquet_data = vec![];
4306        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4307        writer.write(&batch).unwrap();
4308        writer.close().unwrap();
4309        Bytes::from(parquet_data)
4310    }
4311
4312    #[test]
4313    fn test_schema_error_bad_types() {
4314        // verify incompatible schemas error on read
4315        let parquet_data = utf8_parquet();
4316
4317        // Ask to read it back with an incompatible schema (int vs string)
4318        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4319            "column1",
4320            arrow::datatypes::DataType::Int32,
4321            false,
4322        )]));
4323
4324        // read it back out
4325        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4326        let err =
4327            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4328                .unwrap_err();
4329        assert_eq!(
4330            err.to_string(),
4331            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4332        )
4333    }
4334
4335    #[test]
4336    fn test_schema_error_bad_nullability() {
4337        // verify incompatible schemas error on read
4338        let parquet_data = utf8_parquet();
4339
4340        // Ask to read it back with an incompatible schema (nullability mismatch)
4341        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4342            "column1",
4343            arrow::datatypes::DataType::Utf8,
4344            true,
4345        )]));
4346
4347        // read it back out
4348        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4349        let err =
4350            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4351                .unwrap_err();
4352        assert_eq!(
4353            err.to_string(),
4354            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4355        )
4356    }
4357
4358    #[test]
4359    fn test_read_binary_as_utf8() {
4360        let file = write_parquet_from_iter(vec![
4361            (
4362                "binary_to_utf8",
4363                Arc::new(BinaryArray::from(vec![
4364                    b"one".as_ref(),
4365                    b"two".as_ref(),
4366                    b"three".as_ref(),
4367                ])) as ArrayRef,
4368            ),
4369            (
4370                "large_binary_to_large_utf8",
4371                Arc::new(LargeBinaryArray::from(vec![
4372                    b"one".as_ref(),
4373                    b"two".as_ref(),
4374                    b"three".as_ref(),
4375                ])) as ArrayRef,
4376            ),
4377            (
4378                "binary_view_to_utf8_view",
4379                Arc::new(BinaryViewArray::from(vec![
4380                    b"one".as_ref(),
4381                    b"two".as_ref(),
4382                    b"three".as_ref(),
4383                ])) as ArrayRef,
4384            ),
4385        ]);
4386        let supplied_fields = Fields::from(vec![
4387            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4388            Field::new(
4389                "large_binary_to_large_utf8",
4390                ArrowDataType::LargeUtf8,
4391                false,
4392            ),
4393            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4394        ]);
4395
4396        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4397        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4398            file.try_clone().unwrap(),
4399            options,
4400        )
4401        .expect("reader builder with schema")
4402        .build()
4403        .expect("reader with schema");
4404
4405        let batch = arrow_reader.next().unwrap().unwrap();
4406        assert_eq!(batch.num_columns(), 3);
4407        assert_eq!(batch.num_rows(), 3);
4408        assert_eq!(
4409            batch
4410                .column(0)
4411                .as_string::<i32>()
4412                .iter()
4413                .collect::<Vec<_>>(),
4414            vec![Some("one"), Some("two"), Some("three")]
4415        );
4416
4417        assert_eq!(
4418            batch
4419                .column(1)
4420                .as_string::<i64>()
4421                .iter()
4422                .collect::<Vec<_>>(),
4423            vec![Some("one"), Some("two"), Some("three")]
4424        );
4425
4426        assert_eq!(
4427            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4428            vec![Some("one"), Some("two"), Some("three")]
4429        );
4430    }
4431
4432    #[test]
4433    #[should_panic(expected = "Invalid UTF8 sequence at")]
4434    fn test_read_non_utf8_binary_as_utf8() {
4435        let file = write_parquet_from_iter(vec![(
4436            "non_utf8_binary",
4437            Arc::new(BinaryArray::from(vec![
4438                b"\xDE\x00\xFF".as_ref(),
4439                b"\xDE\x01\xAA".as_ref(),
4440                b"\xDE\x02\xFF".as_ref(),
4441            ])) as ArrayRef,
4442        )]);
4443        let supplied_fields = Fields::from(vec![Field::new(
4444            "non_utf8_binary",
4445            ArrowDataType::Utf8,
4446            false,
4447        )]);
4448
4449        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4450        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4451            file.try_clone().unwrap(),
4452            options,
4453        )
4454        .expect("reader builder with schema")
4455        .build()
4456        .expect("reader with schema");
4457        arrow_reader.next().unwrap().unwrap_err();
4458    }
4459
4460    #[test]
4461    fn test_with_schema() {
4462        let nested_fields = Fields::from(vec![
4463            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4464            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4465        ]);
4466
4467        let nested_arrays: Vec<ArrayRef> = vec![
4468            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4469            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4470        ];
4471
4472        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4473
4474        let file = write_parquet_from_iter(vec![
4475            (
4476                "int32_to_ts_second",
4477                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4478            ),
4479            (
4480                "date32_to_date64",
4481                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4482            ),
4483            ("nested", Arc::new(nested) as ArrayRef),
4484        ]);
4485
4486        let supplied_nested_fields = Fields::from(vec![
4487            Field::new(
4488                "utf8_to_dict",
4489                ArrowDataType::Dictionary(
4490                    Box::new(ArrowDataType::Int32),
4491                    Box::new(ArrowDataType::Utf8),
4492                ),
4493                false,
4494            ),
4495            Field::new(
4496                "int64_to_ts_nano",
4497                ArrowDataType::Timestamp(
4498                    arrow::datatypes::TimeUnit::Nanosecond,
4499                    Some("+10:00".into()),
4500                ),
4501                false,
4502            ),
4503        ]);
4504
4505        let supplied_schema = Arc::new(Schema::new(vec![
4506            Field::new(
4507                "int32_to_ts_second",
4508                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4509                false,
4510            ),
4511            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4512            Field::new(
4513                "nested",
4514                ArrowDataType::Struct(supplied_nested_fields),
4515                false,
4516            ),
4517        ]));
4518
4519        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4520        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4521            file.try_clone().unwrap(),
4522            options,
4523        )
4524        .expect("reader builder with schema")
4525        .build()
4526        .expect("reader with schema");
4527
4528        assert_eq!(arrow_reader.schema(), supplied_schema);
4529        let batch = arrow_reader.next().unwrap().unwrap();
4530        assert_eq!(batch.num_columns(), 3);
4531        assert_eq!(batch.num_rows(), 4);
4532        assert_eq!(
4533            batch
4534                .column(0)
4535                .as_any()
4536                .downcast_ref::<TimestampSecondArray>()
4537                .expect("downcast to timestamp second")
4538                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4539                .map(|v| v.to_string())
4540                .expect("value as datetime"),
4541            "1970-01-01 01:00:00 +01:00"
4542        );
4543        assert_eq!(
4544            batch
4545                .column(1)
4546                .as_any()
4547                .downcast_ref::<Date64Array>()
4548                .expect("downcast to date64")
4549                .value_as_date(0)
4550                .map(|v| v.to_string())
4551                .expect("value as date"),
4552            "1970-01-01"
4553        );
4554
4555        let nested = batch
4556            .column(2)
4557            .as_any()
4558            .downcast_ref::<StructArray>()
4559            .expect("downcast to struct");
4560
4561        let nested_dict = nested
4562            .column(0)
4563            .as_any()
4564            .downcast_ref::<Int32DictionaryArray>()
4565            .expect("downcast to dictionary");
4566
4567        assert_eq!(
4568            nested_dict
4569                .values()
4570                .as_any()
4571                .downcast_ref::<StringArray>()
4572                .expect("downcast to string")
4573                .iter()
4574                .collect::<Vec<_>>(),
4575            vec![Some("a"), Some("b")]
4576        );
4577
4578        assert_eq!(
4579            nested_dict.keys().iter().collect::<Vec<_>>(),
4580            vec![Some(0), Some(0), Some(0), Some(1)]
4581        );
4582
4583        assert_eq!(
4584            nested
4585                .column(1)
4586                .as_any()
4587                .downcast_ref::<TimestampNanosecondArray>()
4588                .expect("downcast to timestamp nanosecond")
4589                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4590                .map(|v| v.to_string())
4591                .expect("value as datetime"),
4592            "1970-01-01 10:00:00.000000001 +10:00"
4593        );
4594    }
4595
4596    #[test]
4597    fn test_empty_projection() {
4598        let testdata = arrow::util::test_util::parquet_test_data();
4599        let path = format!("{testdata}/alltypes_plain.parquet");
4600        let file = File::open(path).unwrap();
4601
4602        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4603        let file_metadata = builder.metadata().file_metadata();
4604        let expected_rows = file_metadata.num_rows() as usize;
4605
4606        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4607        let batch_reader = builder
4608            .with_projection(mask)
4609            .with_batch_size(2)
4610            .build()
4611            .unwrap();
4612
4613        let mut total_rows = 0;
4614        for maybe_batch in batch_reader {
4615            let batch = maybe_batch.unwrap();
4616            total_rows += batch.num_rows();
4617            assert_eq!(batch.num_columns(), 0);
4618            assert!(batch.num_rows() <= 2);
4619        }
4620
4621        assert_eq!(total_rows, expected_rows);
4622    }
4623
4624    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4625        let schema = Arc::new(Schema::new(vec![Field::new(
4626            "list",
4627            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4628            true,
4629        )]));
4630
4631        let mut buf = Vec::with_capacity(1024);
4632
4633        let mut writer = ArrowWriter::try_new(
4634            &mut buf,
4635            schema.clone(),
4636            Some(
4637                WriterProperties::builder()
4638                    .set_max_row_group_size(row_group_size)
4639                    .build(),
4640            ),
4641        )
4642        .unwrap();
4643        for _ in 0..2 {
4644            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4645            for _ in 0..(batch_size) {
4646                list_builder.append(true);
4647            }
4648            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4649                .unwrap();
4650            writer.write(&batch).unwrap();
4651        }
4652        writer.close().unwrap();
4653
4654        let mut record_reader =
4655            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4656        assert_eq!(
4657            batch_size,
4658            record_reader.next().unwrap().unwrap().num_rows()
4659        );
4660        assert_eq!(
4661            batch_size,
4662            record_reader.next().unwrap().unwrap().num_rows()
4663        );
4664    }
4665
4666    #[test]
4667    fn test_row_group_exact_multiple() {
4668        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4669        test_row_group_batch(8, 8);
4670        test_row_group_batch(10, 8);
4671        test_row_group_batch(8, 10);
4672        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4673        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4674        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4675        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4676        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4677    }
4678
4679    /// Given a RecordBatch containing all the column data, return the expected batches given
4680    /// a `batch_size` and `selection`
4681    fn get_expected_batches(
4682        column: &RecordBatch,
4683        selection: &RowSelection,
4684        batch_size: usize,
4685    ) -> Vec<RecordBatch> {
4686        let mut expected_batches = vec![];
4687
4688        let mut selection: VecDeque<_> = selection.clone().into();
4689        let mut row_offset = 0;
4690        let mut last_start = None;
4691        while row_offset < column.num_rows() && !selection.is_empty() {
4692            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4693            while batch_remaining > 0 && !selection.is_empty() {
4694                let (to_read, skip) = match selection.front_mut() {
4695                    Some(selection) if selection.row_count > batch_remaining => {
4696                        selection.row_count -= batch_remaining;
4697                        (batch_remaining, selection.skip)
4698                    }
4699                    Some(_) => {
4700                        let select = selection.pop_front().unwrap();
4701                        (select.row_count, select.skip)
4702                    }
4703                    None => break,
4704                };
4705
4706                batch_remaining -= to_read;
4707
4708                match skip {
4709                    true => {
4710                        if let Some(last_start) = last_start.take() {
4711                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4712                        }
4713                        row_offset += to_read
4714                    }
4715                    false => {
4716                        last_start.get_or_insert(row_offset);
4717                        row_offset += to_read
4718                    }
4719                }
4720            }
4721        }
4722
4723        if let Some(last_start) = last_start.take() {
4724            expected_batches.push(column.slice(last_start, row_offset - last_start))
4725        }
4726
4727        // Sanity check, all batches except the final should be the batch size
4728        for batch in &expected_batches[..expected_batches.len() - 1] {
4729            assert_eq!(batch.num_rows(), batch_size);
4730        }
4731
4732        expected_batches
4733    }
4734
4735    fn create_test_selection(
4736        step_len: usize,
4737        total_len: usize,
4738        skip_first: bool,
4739    ) -> (RowSelection, usize) {
4740        let mut remaining = total_len;
4741        let mut skip = skip_first;
4742        let mut vec = vec![];
4743        let mut selected_count = 0;
4744        while remaining != 0 {
4745            let step = if remaining > step_len {
4746                step_len
4747            } else {
4748                remaining
4749            };
4750            vec.push(RowSelector {
4751                row_count: step,
4752                skip,
4753            });
4754            remaining -= step;
4755            if !skip {
4756                selected_count += step;
4757            }
4758            skip = !skip;
4759        }
4760        (vec.into(), selected_count)
4761    }
4762
4763    #[test]
4764    fn test_scan_row_with_selection() {
4765        let testdata = arrow::util::test_util::parquet_test_data();
4766        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4767        let test_file = File::open(&path).unwrap();
4768
4769        let mut serial_reader =
4770            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4771        let data = serial_reader.next().unwrap().unwrap();
4772
4773        let do_test = |batch_size: usize, selection_len: usize| {
4774            for skip_first in [false, true] {
4775                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4776
4777                let expected = get_expected_batches(&data, &selections, batch_size);
4778                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4779                assert_eq!(
4780                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4781                    expected,
4782                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4783                );
4784            }
4785        };
4786
4787        // total row count 7300
4788        // 1. test selection len more than one page row count
4789        do_test(1000, 1000);
4790
4791        // 2. test selection len less than one page row count
4792        do_test(20, 20);
4793
4794        // 3. test selection_len less than batch_size
4795        do_test(20, 5);
4796
4797        // 4. test selection_len more than batch_size
4798        // If batch_size < selection_len
4799        do_test(20, 5);
4800
4801        fn create_skip_reader(
4802            test_file: &File,
4803            batch_size: usize,
4804            selections: RowSelection,
4805        ) -> ParquetRecordBatchReader {
4806            let options =
4807                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4808            let file = test_file.try_clone().unwrap();
4809            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4810                .unwrap()
4811                .with_batch_size(batch_size)
4812                .with_row_selection(selections)
4813                .build()
4814                .unwrap()
4815        }
4816    }
4817
4818    #[test]
4819    fn test_batch_size_overallocate() {
4820        let testdata = arrow::util::test_util::parquet_test_data();
4821        // `alltypes_plain.parquet` only have 8 rows
4822        let path = format!("{testdata}/alltypes_plain.parquet");
4823        let test_file = File::open(path).unwrap();
4824
4825        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4826        let num_rows = builder.metadata.file_metadata().num_rows();
4827        let reader = builder
4828            .with_batch_size(1024)
4829            .with_projection(ProjectionMask::all())
4830            .build()
4831            .unwrap();
4832        assert_ne!(1024, num_rows);
4833        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4834    }
4835
4836    #[test]
4837    fn test_read_with_page_index_enabled() {
4838        let testdata = arrow::util::test_util::parquet_test_data();
4839
4840        {
4841            // `alltypes_tiny_pages.parquet` has page index
4842            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4843            let test_file = File::open(path).unwrap();
4844            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4845                test_file,
4846                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4847            )
4848            .unwrap();
4849            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4850            let reader = builder.build().unwrap();
4851            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4852            assert_eq!(batches.len(), 8);
4853        }
4854
4855        {
4856            // `alltypes_plain.parquet` doesn't have page index
4857            let path = format!("{testdata}/alltypes_plain.parquet");
4858            let test_file = File::open(path).unwrap();
4859            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4860                test_file,
4861                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4862            )
4863            .unwrap();
4864            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4865            // we should read the file successfully.
4866            assert!(builder.metadata().offset_index().is_none());
4867            let reader = builder.build().unwrap();
4868            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4869            assert_eq!(batches.len(), 1);
4870        }
4871    }
4872
4873    #[test]
4874    fn test_raw_repetition() {
4875        const MESSAGE_TYPE: &str = "
4876            message Log {
4877              OPTIONAL INT32 eventType;
4878              REPEATED INT32 category;
4879              REPEATED group filter {
4880                OPTIONAL INT32 error;
4881              }
4882            }
4883        ";
4884        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4885        let props = Default::default();
4886
4887        let mut buf = Vec::with_capacity(1024);
4888        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4889        let mut row_group_writer = writer.next_row_group().unwrap();
4890
4891        // column 0
4892        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4893        col_writer
4894            .typed::<Int32Type>()
4895            .write_batch(&[1], Some(&[1]), None)
4896            .unwrap();
4897        col_writer.close().unwrap();
4898        // column 1
4899        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4900        col_writer
4901            .typed::<Int32Type>()
4902            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4903            .unwrap();
4904        col_writer.close().unwrap();
4905        // column 2
4906        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4907        col_writer
4908            .typed::<Int32Type>()
4909            .write_batch(&[1], Some(&[1]), Some(&[0]))
4910            .unwrap();
4911        col_writer.close().unwrap();
4912
4913        let rg_md = row_group_writer.close().unwrap();
4914        assert_eq!(rg_md.num_rows(), 1);
4915        writer.close().unwrap();
4916
4917        let bytes = Bytes::from(buf);
4918
4919        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4920        let full = no_mask.next().unwrap().unwrap();
4921
4922        assert_eq!(full.num_columns(), 3);
4923
4924        for idx in 0..3 {
4925            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4926            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4927            let mut reader = b.with_projection(mask).build().unwrap();
4928            let projected = reader.next().unwrap().unwrap();
4929
4930            assert_eq!(projected.num_columns(), 1);
4931            assert_eq!(full.column(idx), projected.column(0));
4932        }
4933    }
4934
4935    #[test]
4936    fn test_read_lz4_raw() {
4937        let testdata = arrow::util::test_util::parquet_test_data();
4938        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4939        let file = File::open(path).unwrap();
4940
4941        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4942            .unwrap()
4943            .collect::<Result<Vec<_>, _>>()
4944            .unwrap();
4945        assert_eq!(batches.len(), 1);
4946        let batch = &batches[0];
4947
4948        assert_eq!(batch.num_columns(), 3);
4949        assert_eq!(batch.num_rows(), 4);
4950
4951        // https://github.com/apache/parquet-testing/pull/18
4952        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4953        assert_eq!(
4954            a.values(),
4955            &[1593604800, 1593604800, 1593604801, 1593604801]
4956        );
4957
4958        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4959        let a: Vec<_> = a.iter().flatten().collect();
4960        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4961
4962        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4963        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4964    }
4965
4966    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4967    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4968    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4969    //    LZ4_HADOOP algorithm for compression.
4970    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4971    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4972    //    older parquet-cpp versions.
4973    //
4974    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4975    #[test]
4976    fn test_read_lz4_hadoop_fallback() {
4977        for file in [
4978            "hadoop_lz4_compressed.parquet",
4979            "non_hadoop_lz4_compressed.parquet",
4980        ] {
4981            let testdata = arrow::util::test_util::parquet_test_data();
4982            let path = format!("{testdata}/{file}");
4983            let file = File::open(path).unwrap();
4984            let expected_rows = 4;
4985
4986            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4987                .unwrap()
4988                .collect::<Result<Vec<_>, _>>()
4989                .unwrap();
4990            assert_eq!(batches.len(), 1);
4991            let batch = &batches[0];
4992
4993            assert_eq!(batch.num_columns(), 3);
4994            assert_eq!(batch.num_rows(), expected_rows);
4995
4996            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4997            assert_eq!(
4998                a.values(),
4999                &[1593604800, 1593604800, 1593604801, 1593604801]
5000            );
5001
5002            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5003            let b: Vec<_> = b.iter().flatten().collect();
5004            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5005
5006            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5007            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5008        }
5009    }
5010
5011    #[test]
5012    fn test_read_lz4_hadoop_large() {
5013        let testdata = arrow::util::test_util::parquet_test_data();
5014        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5015        let file = File::open(path).unwrap();
5016        let expected_rows = 10000;
5017
5018        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5019            .unwrap()
5020            .collect::<Result<Vec<_>, _>>()
5021            .unwrap();
5022        assert_eq!(batches.len(), 1);
5023        let batch = &batches[0];
5024
5025        assert_eq!(batch.num_columns(), 1);
5026        assert_eq!(batch.num_rows(), expected_rows);
5027
5028        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5029        let a: Vec<_> = a.iter().flatten().collect();
5030        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5031        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5032        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5033        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5034    }
5035
5036    #[test]
5037    #[cfg(feature = "snap")]
5038    fn test_read_nested_lists() {
5039        let testdata = arrow::util::test_util::parquet_test_data();
5040        let path = format!("{testdata}/nested_lists.snappy.parquet");
5041        let file = File::open(path).unwrap();
5042
5043        let f = file.try_clone().unwrap();
5044        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5045        let expected = reader.next().unwrap().unwrap();
5046        assert_eq!(expected.num_rows(), 3);
5047
5048        let selection = RowSelection::from(vec![
5049            RowSelector::skip(1),
5050            RowSelector::select(1),
5051            RowSelector::skip(1),
5052        ]);
5053        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5054            .unwrap()
5055            .with_row_selection(selection)
5056            .build()
5057            .unwrap();
5058
5059        let actual = reader.next().unwrap().unwrap();
5060        assert_eq!(actual.num_rows(), 1);
5061        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5062    }
5063
5064    #[test]
5065    fn test_arbitrary_decimal() {
5066        let values = [1, 2, 3, 4, 5, 6, 7, 8];
5067        let decimals_19_0 = Decimal128Array::from_iter_values(values)
5068            .with_precision_and_scale(19, 0)
5069            .unwrap();
5070        let decimals_12_0 = Decimal128Array::from_iter_values(values)
5071            .with_precision_and_scale(12, 0)
5072            .unwrap();
5073        let decimals_17_10 = Decimal128Array::from_iter_values(values)
5074            .with_precision_and_scale(17, 10)
5075            .unwrap();
5076
5077        let written = RecordBatch::try_from_iter([
5078            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5079            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5080            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5081        ])
5082        .unwrap();
5083
5084        let mut buffer = Vec::with_capacity(1024);
5085        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5086        writer.write(&written).unwrap();
5087        writer.close().unwrap();
5088
5089        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5090            .unwrap()
5091            .collect::<Result<Vec<_>, _>>()
5092            .unwrap();
5093
5094        assert_eq!(&written.slice(0, 8), &read[0]);
5095    }
5096
5097    #[test]
5098    fn test_list_skip() {
5099        let mut list = ListBuilder::new(Int32Builder::new());
5100        list.append_value([Some(1), Some(2)]);
5101        list.append_value([Some(3)]);
5102        list.append_value([Some(4)]);
5103        let list = list.finish();
5104        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5105
5106        // First page contains 2 values but only 1 row
5107        let props = WriterProperties::builder()
5108            .set_data_page_row_count_limit(1)
5109            .set_write_batch_size(2)
5110            .build();
5111
5112        let mut buffer = Vec::with_capacity(1024);
5113        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5114        writer.write(&batch).unwrap();
5115        writer.close().unwrap();
5116
5117        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5118        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5119            .unwrap()
5120            .with_row_selection(selection.into())
5121            .build()
5122            .unwrap();
5123        let out = reader.next().unwrap().unwrap();
5124        assert_eq!(out.num_rows(), 1);
5125        assert_eq!(out, batch.slice(2, 1));
5126    }
5127
5128    #[test]
5129    fn test_row_selection_interleaved_skip() -> Result<()> {
5130        let schema = Arc::new(Schema::new(vec![Field::new(
5131            "v",
5132            ArrowDataType::Int32,
5133            false,
5134        )]));
5135
5136        let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
5137        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
5138
5139        let mut buffer = Vec::with_capacity(1024);
5140        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
5141        writer.write(&batch)?;
5142        writer.close()?;
5143
5144        let selection = RowSelection::from(vec![
5145            RowSelector::select(1),
5146            RowSelector::skip(2),
5147            RowSelector::select(2),
5148        ]);
5149
5150        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
5151            .with_batch_size(4)
5152            .with_row_selection(selection)
5153            .build()?;
5154
5155        let out = reader.next().unwrap()?;
5156        assert_eq!(out.num_rows(), 3);
5157        let values = out
5158            .column(0)
5159            .as_primitive::<arrow_array::types::Int32Type>()
5160            .values();
5161        assert_eq!(values, &[0, 3, 4]);
5162        assert!(reader.next().is_none());
5163        Ok(())
5164    }
5165
5166    #[test]
5167    fn test_row_selection_mask_sparse_rows() -> Result<()> {
5168        let schema = Arc::new(Schema::new(vec![Field::new(
5169            "v",
5170            ArrowDataType::Int32,
5171            false,
5172        )]));
5173
5174        let values = Int32Array::from((0..30).collect::<Vec<i32>>());
5175        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
5176
5177        let mut buffer = Vec::with_capacity(1024);
5178        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
5179        writer.write(&batch)?;
5180        writer.close()?;
5181
5182        let total_rows = batch.num_rows();
5183        let ranges = (1..total_rows)
5184            .step_by(2)
5185            .map(|i| i..i + 1)
5186            .collect::<Vec<_>>();
5187        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
5188
5189        let selectors: Vec<RowSelector> = selection.clone().into();
5190        assert!(total_rows < selectors.len() * 8);
5191
5192        let bytes = Bytes::from(buffer);
5193
5194        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
5195            .with_batch_size(7)
5196            .with_row_selection(selection)
5197            .build()?;
5198
5199        let mut collected = Vec::new();
5200        for batch in reader {
5201            let batch = batch?;
5202            collected.extend_from_slice(
5203                batch
5204                    .column(0)
5205                    .as_primitive::<arrow_array::types::Int32Type>()
5206                    .values(),
5207            );
5208        }
5209
5210        let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5211        assert_eq!(collected, expected);
5212        Ok(())
5213    }
5214
5215    fn test_decimal32_roundtrip() {
5216        let d = |values: Vec<i32>, p: u8| {
5217            let iter = values.into_iter();
5218            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5219                .with_precision_and_scale(p, 2)
5220                .unwrap()
5221        };
5222
5223        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5224        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5225
5226        let mut buffer = Vec::with_capacity(1024);
5227        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5228        writer.write(&batch).unwrap();
5229        writer.close().unwrap();
5230
5231        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5232        let t1 = builder.parquet_schema().columns()[0].physical_type();
5233        assert_eq!(t1, PhysicalType::INT32);
5234
5235        let mut reader = builder.build().unwrap();
5236        assert_eq!(batch.schema(), reader.schema());
5237
5238        let out = reader.next().unwrap().unwrap();
5239        assert_eq!(batch, out);
5240    }
5241
5242    fn test_decimal64_roundtrip() {
5243        // Precision <= 9 -> INT32
5244        // Precision <= 18 -> INT64
5245
5246        let d = |values: Vec<i64>, p: u8| {
5247            let iter = values.into_iter();
5248            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5249                .with_precision_and_scale(p, 2)
5250                .unwrap()
5251        };
5252
5253        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5254        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5255        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5256
5257        let batch = RecordBatch::try_from_iter([
5258            ("d1", Arc::new(d1) as ArrayRef),
5259            ("d2", Arc::new(d2) as ArrayRef),
5260            ("d3", Arc::new(d3) as ArrayRef),
5261        ])
5262        .unwrap();
5263
5264        let mut buffer = Vec::with_capacity(1024);
5265        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5266        writer.write(&batch).unwrap();
5267        writer.close().unwrap();
5268
5269        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5270        let t1 = builder.parquet_schema().columns()[0].physical_type();
5271        assert_eq!(t1, PhysicalType::INT32);
5272        let t2 = builder.parquet_schema().columns()[1].physical_type();
5273        assert_eq!(t2, PhysicalType::INT64);
5274        let t3 = builder.parquet_schema().columns()[2].physical_type();
5275        assert_eq!(t3, PhysicalType::INT64);
5276
5277        let mut reader = builder.build().unwrap();
5278        assert_eq!(batch.schema(), reader.schema());
5279
5280        let out = reader.next().unwrap().unwrap();
5281        assert_eq!(batch, out);
5282    }
5283
5284    fn test_decimal_roundtrip<T: DecimalType>() {
5285        // Precision <= 9 -> INT32
5286        // Precision <= 18 -> INT64
5287        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5288
5289        let d = |values: Vec<usize>, p: u8| {
5290            let iter = values.into_iter().map(T::Native::usize_as);
5291            PrimitiveArray::<T>::from_iter_values(iter)
5292                .with_precision_and_scale(p, 2)
5293                .unwrap()
5294        };
5295
5296        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5297        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5298        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5299        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5300
5301        let batch = RecordBatch::try_from_iter([
5302            ("d1", Arc::new(d1) as ArrayRef),
5303            ("d2", Arc::new(d2) as ArrayRef),
5304            ("d3", Arc::new(d3) as ArrayRef),
5305            ("d4", Arc::new(d4) as ArrayRef),
5306        ])
5307        .unwrap();
5308
5309        let mut buffer = Vec::with_capacity(1024);
5310        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5311        writer.write(&batch).unwrap();
5312        writer.close().unwrap();
5313
5314        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5315        let t1 = builder.parquet_schema().columns()[0].physical_type();
5316        assert_eq!(t1, PhysicalType::INT32);
5317        let t2 = builder.parquet_schema().columns()[1].physical_type();
5318        assert_eq!(t2, PhysicalType::INT64);
5319        let t3 = builder.parquet_schema().columns()[2].physical_type();
5320        assert_eq!(t3, PhysicalType::INT64);
5321        let t4 = builder.parquet_schema().columns()[3].physical_type();
5322        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5323
5324        let mut reader = builder.build().unwrap();
5325        assert_eq!(batch.schema(), reader.schema());
5326
5327        let out = reader.next().unwrap().unwrap();
5328        assert_eq!(batch, out);
5329    }
5330
5331    #[test]
5332    fn test_decimal() {
5333        test_decimal32_roundtrip();
5334        test_decimal64_roundtrip();
5335        test_decimal_roundtrip::<Decimal128Type>();
5336        test_decimal_roundtrip::<Decimal256Type>();
5337    }
5338
5339    #[test]
5340    fn test_list_selection() {
5341        let schema = Arc::new(Schema::new(vec![Field::new_list(
5342            "list",
5343            Field::new_list_field(ArrowDataType::Utf8, true),
5344            false,
5345        )]));
5346        let mut buf = Vec::with_capacity(1024);
5347
5348        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5349
5350        for i in 0..2 {
5351            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5352            for j in 0..1024 {
5353                list_a_builder.values().append_value(format!("{i} {j}"));
5354                list_a_builder.append(true);
5355            }
5356            let batch =
5357                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5358                    .unwrap();
5359            writer.write(&batch).unwrap();
5360        }
5361        let _metadata = writer.close().unwrap();
5362
5363        let buf = Bytes::from(buf);
5364        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5365            .unwrap()
5366            .with_row_selection(RowSelection::from(vec![
5367                RowSelector::skip(100),
5368                RowSelector::select(924),
5369                RowSelector::skip(100),
5370                RowSelector::select(924),
5371            ]))
5372            .build()
5373            .unwrap();
5374
5375        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5376        let batch = concat_batches(&schema, &batches).unwrap();
5377
5378        assert_eq!(batch.num_rows(), 924 * 2);
5379        let list = batch.column(0).as_list::<i32>();
5380
5381        for w in list.value_offsets().windows(2) {
5382            assert_eq!(w[0] + 1, w[1])
5383        }
5384        let mut values = list.values().as_string::<i32>().iter();
5385
5386        for i in 0..2 {
5387            for j in 100..1024 {
5388                let expected = format!("{i} {j}");
5389                assert_eq!(values.next().unwrap().unwrap(), &expected);
5390            }
5391        }
5392    }
5393
5394    #[test]
5395    fn test_list_selection_fuzz() {
5396        let mut rng = rng();
5397        let schema = Arc::new(Schema::new(vec![Field::new_list(
5398            "list",
5399            Field::new_list(
5400                Field::LIST_FIELD_DEFAULT_NAME,
5401                Field::new_list_field(ArrowDataType::Int32, true),
5402                true,
5403            ),
5404            true,
5405        )]));
5406        let mut buf = Vec::with_capacity(1024);
5407        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5408
5409        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5410
5411        for _ in 0..2048 {
5412            if rng.random_bool(0.2) {
5413                list_a_builder.append(false);
5414                continue;
5415            }
5416
5417            let list_a_len = rng.random_range(0..10);
5418            let list_b_builder = list_a_builder.values();
5419
5420            for _ in 0..list_a_len {
5421                if rng.random_bool(0.2) {
5422                    list_b_builder.append(false);
5423                    continue;
5424                }
5425
5426                let list_b_len = rng.random_range(0..10);
5427                let int_builder = list_b_builder.values();
5428                for _ in 0..list_b_len {
5429                    match rng.random_bool(0.2) {
5430                        true => int_builder.append_null(),
5431                        false => int_builder.append_value(rng.random()),
5432                    }
5433                }
5434                list_b_builder.append(true)
5435            }
5436            list_a_builder.append(true);
5437        }
5438
5439        let array = Arc::new(list_a_builder.finish());
5440        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5441
5442        writer.write(&batch).unwrap();
5443        let _metadata = writer.close().unwrap();
5444
5445        let buf = Bytes::from(buf);
5446
5447        let cases = [
5448            vec![
5449                RowSelector::skip(100),
5450                RowSelector::select(924),
5451                RowSelector::skip(100),
5452                RowSelector::select(924),
5453            ],
5454            vec![
5455                RowSelector::select(924),
5456                RowSelector::skip(100),
5457                RowSelector::select(924),
5458                RowSelector::skip(100),
5459            ],
5460            vec![
5461                RowSelector::skip(1023),
5462                RowSelector::select(1),
5463                RowSelector::skip(1023),
5464                RowSelector::select(1),
5465            ],
5466            vec![
5467                RowSelector::select(1),
5468                RowSelector::skip(1023),
5469                RowSelector::select(1),
5470                RowSelector::skip(1023),
5471            ],
5472        ];
5473
5474        for batch_size in [100, 1024, 2048] {
5475            for selection in &cases {
5476                let selection = RowSelection::from(selection.clone());
5477                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5478                    .unwrap()
5479                    .with_row_selection(selection.clone())
5480                    .with_batch_size(batch_size)
5481                    .build()
5482                    .unwrap();
5483
5484                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5485                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5486                assert_eq!(actual.num_rows(), selection.row_count());
5487
5488                let mut batch_offset = 0;
5489                let mut actual_offset = 0;
5490                for selector in selection.iter() {
5491                    if selector.skip {
5492                        batch_offset += selector.row_count;
5493                        continue;
5494                    }
5495
5496                    assert_eq!(
5497                        batch.slice(batch_offset, selector.row_count),
5498                        actual.slice(actual_offset, selector.row_count)
5499                    );
5500
5501                    batch_offset += selector.row_count;
5502                    actual_offset += selector.row_count;
5503                }
5504            }
5505        }
5506    }
5507
5508    #[test]
5509    fn test_read_old_nested_list() {
5510        use arrow::datatypes::DataType;
5511        use arrow::datatypes::ToByteSlice;
5512
5513        let testdata = arrow::util::test_util::parquet_test_data();
5514        // message my_record {
5515        //     REQUIRED group a (LIST) {
5516        //         REPEATED group array (LIST) {
5517        //             REPEATED INT32 array;
5518        //         }
5519        //     }
5520        // }
5521        // should be read as list<list<int32>>
5522        let path = format!("{testdata}/old_list_structure.parquet");
5523        let test_file = File::open(path).unwrap();
5524
5525        // create expected ListArray
5526        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5527
5528        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5529        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5530
5531        // Construct a list array from the above two
5532        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5533            "array",
5534            DataType::Int32,
5535            false,
5536        ))))
5537        .len(2)
5538        .add_buffer(a_value_offsets)
5539        .add_child_data(a_values.into_data())
5540        .build()
5541        .unwrap();
5542        let a = ListArray::from(a_list_data);
5543
5544        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5545        let mut reader = builder.build().unwrap();
5546        let out = reader.next().unwrap().unwrap();
5547        assert_eq!(out.num_rows(), 1);
5548        assert_eq!(out.num_columns(), 1);
5549        // grab first column
5550        let c0 = out.column(0);
5551        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5552        // get first row: [[1, 2], [3, 4]]
5553        let r0 = c0arr.value(0);
5554        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5555        assert_eq!(r0arr, &a);
5556    }
5557
5558    #[test]
5559    fn test_map_no_value() {
5560        // File schema:
5561        // message schema {
5562        //   required group my_map (MAP) {
5563        //     repeated group key_value {
5564        //       required int32 key;
5565        //       optional int32 value;
5566        //     }
5567        //   }
5568        //   required group my_map_no_v (MAP) {
5569        //     repeated group key_value {
5570        //       required int32 key;
5571        //     }
5572        //   }
5573        //   required group my_list (LIST) {
5574        //     repeated group list {
5575        //       required int32 element;
5576        //     }
5577        //   }
5578        // }
5579        let testdata = arrow::util::test_util::parquet_test_data();
5580        let path = format!("{testdata}/map_no_value.parquet");
5581        let file = File::open(path).unwrap();
5582
5583        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5584            .unwrap()
5585            .build()
5586            .unwrap();
5587        let out = reader.next().unwrap().unwrap();
5588        assert_eq!(out.num_rows(), 3);
5589        assert_eq!(out.num_columns(), 3);
5590        // my_map_no_v and my_list columns should now be equivalent
5591        let c0 = out.column(1).as_list::<i32>();
5592        let c1 = out.column(2).as_list::<i32>();
5593        assert_eq!(c0.len(), c1.len());
5594        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5595    }
5596
5597    #[test]
5598    fn test_row_filter_full_page_skip_is_handled() {
5599        let first_value: i64 = 1111;
5600        let last_value: i64 = 9999;
5601        let num_rows: usize = 12;
5602
5603        // build data with row selection average length 4
5604        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
5605        // The Row Selection would be [1111, (skip 10), 9999]
5606        let schema = Arc::new(Schema::new(vec![
5607            Field::new("key", arrow_schema::DataType::Int64, false),
5608            Field::new("value", arrow_schema::DataType::Int64, false),
5609        ]));
5610
5611        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5612        int_values[0] = first_value;
5613        int_values[num_rows - 1] = last_value;
5614        let keys = Int64Array::from(int_values.clone());
5615        let values = Int64Array::from(int_values.clone());
5616        let batch = RecordBatch::try_new(
5617            Arc::clone(&schema),
5618            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5619        )
5620        .unwrap();
5621
5622        let props = WriterProperties::builder()
5623            .set_write_batch_size(2)
5624            .set_data_page_row_count_limit(2)
5625            .build();
5626
5627        let mut buffer = Vec::new();
5628        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5629        writer.write(&batch).unwrap();
5630        writer.close().unwrap();
5631        let data = Bytes::from(buffer);
5632
5633        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5634        let builder =
5635            ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5636        let schema = builder.parquet_schema().clone();
5637        let filter_mask = ProjectionMask::leaves(&schema, [0]);
5638
5639        let make_predicate = |mask: ProjectionMask| {
5640            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5641                let column = batch.column(0);
5642                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5643                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5644                or(&match_first, &match_second)
5645            })
5646        };
5647
5648        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5649        let predicate = make_predicate(filter_mask.clone());
5650
5651        // The batch size is set to 12 to read all rows in one go after filtering
5652        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
5653        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5654            .unwrap()
5655            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5656            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5657            .with_batch_size(12)
5658            .build()
5659            .unwrap();
5660
5661        // Predicate pruning used to panic once mask-backed plans removed whole pages.
5662        // Collecting into batches validates the plan now downgrades to selectors instead.
5663        let schema = reader.schema().clone();
5664        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5665        let result = concat_batches(&schema, &batches).unwrap();
5666        assert_eq!(result.num_rows(), 2);
5667    }
5668
5669    #[test]
5670    fn test_get_row_group_column_bloom_filter_with_length() {
5671        // convert to new parquet file with bloom_filter_length
5672        let testdata = arrow::util::test_util::parquet_test_data();
5673        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5674        let file = File::open(path).unwrap();
5675        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5676        let schema = builder.schema().clone();
5677        let reader = builder.build().unwrap();
5678
5679        let mut parquet_data = Vec::new();
5680        let props = WriterProperties::builder()
5681            .set_bloom_filter_enabled(true)
5682            .build();
5683        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5684        for batch in reader {
5685            let batch = batch.unwrap();
5686            writer.write(&batch).unwrap();
5687        }
5688        writer.close().unwrap();
5689
5690        // test the new parquet file
5691        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5692    }
5693
5694    #[test]
5695    fn test_get_row_group_column_bloom_filter_without_length() {
5696        let testdata = arrow::util::test_util::parquet_test_data();
5697        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5698        let data = Bytes::from(std::fs::read(path).unwrap());
5699        test_get_row_group_column_bloom_filter(data, false);
5700    }
5701
5702    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5703        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5704
5705        let metadata = builder.metadata();
5706        assert_eq!(metadata.num_row_groups(), 1);
5707        let row_group = metadata.row_group(0);
5708        let column = row_group.column(0);
5709        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5710
5711        let sbbf = builder
5712            .get_row_group_column_bloom_filter(0, 0)
5713            .unwrap()
5714            .unwrap();
5715        assert!(sbbf.check(&"Hello"));
5716        assert!(!sbbf.check(&"Hello_Not_Exists"));
5717    }
5718
5719    #[test]
5720    fn test_read_unknown_logical_type() {
5721        let testdata = arrow::util::test_util::parquet_test_data();
5722        let path = format!("{testdata}/unknown-logical-type.parquet");
5723        let test_file = File::open(path).unwrap();
5724
5725        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5726            .expect("Error creating reader builder");
5727
5728        let schema = builder.metadata().file_metadata().schema_descr();
5729        assert_eq!(
5730            schema.column(0).logical_type_ref(),
5731            Some(&LogicalType::String)
5732        );
5733        assert_eq!(
5734            schema.column(1).logical_type_ref(),
5735            Some(&LogicalType::_Unknown { field_id: 2555 })
5736        );
5737        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5738
5739        let mut reader = builder.build().unwrap();
5740        let out = reader.next().unwrap().unwrap();
5741        assert_eq!(out.num_rows(), 3);
5742        assert_eq!(out.num_columns(), 2);
5743    }
5744
5745    #[test]
5746    fn test_read_row_numbers() {
5747        let file = write_parquet_from_iter(vec![(
5748            "value",
5749            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5750        )]);
5751        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5752
5753        let row_number_field = Arc::new(
5754            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5755        );
5756
5757        let options = ArrowReaderOptions::new()
5758            .with_schema(Arc::new(Schema::new(supplied_fields)))
5759            .with_virtual_columns(vec![row_number_field.clone()])
5760            .unwrap();
5761        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5762            file.try_clone().unwrap(),
5763            options,
5764        )
5765        .expect("reader builder with schema")
5766        .build()
5767        .expect("reader with schema");
5768
5769        let batch = arrow_reader.next().unwrap().unwrap();
5770        let schema = Arc::new(Schema::new(vec![
5771            Field::new("value", ArrowDataType::Int64, false),
5772            (*row_number_field).clone(),
5773        ]));
5774
5775        assert_eq!(batch.schema(), schema);
5776        assert_eq!(batch.num_columns(), 2);
5777        assert_eq!(batch.num_rows(), 3);
5778        assert_eq!(
5779            batch
5780                .column(0)
5781                .as_primitive::<types::Int64Type>()
5782                .iter()
5783                .collect::<Vec<_>>(),
5784            vec![Some(1), Some(2), Some(3)]
5785        );
5786        assert_eq!(
5787            batch
5788                .column(1)
5789                .as_primitive::<types::Int64Type>()
5790                .iter()
5791                .collect::<Vec<_>>(),
5792            vec![Some(0), Some(1), Some(2)]
5793        );
5794    }
5795
5796    #[test]
5797    fn test_read_only_row_numbers() {
5798        let file = write_parquet_from_iter(vec![(
5799            "value",
5800            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5801        )]);
5802        let row_number_field = Arc::new(
5803            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5804        );
5805        let options = ArrowReaderOptions::new()
5806            .with_virtual_columns(vec![row_number_field.clone()])
5807            .unwrap();
5808        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5809        let num_columns = metadata
5810            .metadata
5811            .file_metadata()
5812            .schema_descr()
5813            .num_columns();
5814
5815        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5816            .with_projection(ProjectionMask::none(num_columns))
5817            .build()
5818            .expect("reader with schema");
5819
5820        let batch = arrow_reader.next().unwrap().unwrap();
5821        let schema = Arc::new(Schema::new(vec![row_number_field]));
5822
5823        assert_eq!(batch.schema(), schema);
5824        assert_eq!(batch.num_columns(), 1);
5825        assert_eq!(batch.num_rows(), 3);
5826        assert_eq!(
5827            batch
5828                .column(0)
5829                .as_primitive::<types::Int64Type>()
5830                .iter()
5831                .collect::<Vec<_>>(),
5832            vec![Some(0), Some(1), Some(2)]
5833        );
5834    }
5835
5836    #[test]
5837    fn test_read_row_numbers_row_group_order() -> Result<()> {
5838        // Make a parquet file with 100 rows split across 2 row groups
5839        let array = Int64Array::from_iter_values(5000..5100);
5840        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5841        let mut buffer = Vec::new();
5842        let options = WriterProperties::builder()
5843            .set_max_row_group_size(50)
5844            .build();
5845        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5846        // write in 10 row batches as the size limits are enforced after each batch
5847        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5848            writer.write(&batch_chunk)?;
5849        }
5850        writer.close()?;
5851
5852        let row_number_field = Arc::new(
5853            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5854        );
5855
5856        let buffer = Bytes::from(buffer);
5857
5858        let options =
5859            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5860
5861        // read out with normal options
5862        let arrow_reader =
5863            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5864                .build()?;
5865
5866        assert_eq!(
5867            ValuesAndRowNumbers {
5868                values: (5000..5100).collect(),
5869                row_numbers: (0..100).collect()
5870            },
5871            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5872        );
5873
5874        // Now read, out of order row groups
5875        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5876            .with_row_groups(vec![1, 0])
5877            .build()?;
5878
5879        assert_eq!(
5880            ValuesAndRowNumbers {
5881                values: (5050..5100).chain(5000..5050).collect(),
5882                row_numbers: (50..100).chain(0..50).collect(),
5883            },
5884            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5885        );
5886
5887        Ok(())
5888    }
5889
5890    #[derive(Debug, PartialEq)]
5891    struct ValuesAndRowNumbers {
5892        values: Vec<i64>,
5893        row_numbers: Vec<i64>,
5894    }
5895    impl ValuesAndRowNumbers {
5896        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5897            let mut values = vec![];
5898            let mut row_numbers = vec![];
5899            for batch in reader {
5900                let batch = batch.expect("Could not read batch");
5901                values.extend(
5902                    batch
5903                        .column_by_name("col")
5904                        .expect("Could not get col column")
5905                        .as_primitive::<arrow::datatypes::Int64Type>()
5906                        .iter()
5907                        .map(|v| v.expect("Could not get value")),
5908                );
5909
5910                row_numbers.extend(
5911                    batch
5912                        .column_by_name("row_number")
5913                        .expect("Could not get row_number column")
5914                        .as_primitive::<arrow::datatypes::Int64Type>()
5915                        .iter()
5916                        .map(|v| v.expect("Could not get row number"))
5917                        .collect::<Vec<_>>(),
5918                );
5919            }
5920            Self {
5921                values,
5922                row_numbers,
5923            }
5924        }
5925    }
5926
5927    #[test]
5928    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5929        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5930        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5931        assert_eq!(
5932            ArrowReaderOptions::new()
5933                .with_virtual_columns(vec![regular_field])
5934                .unwrap_err()
5935                .to_string(),
5936            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5937        );
5938    }
5939
5940    #[test]
5941    fn test_row_numbers_with_multiple_row_groups() {
5942        test_row_numbers_with_multiple_row_groups_helper(
5943            false,
5944            |path, selection, _row_filter, batch_size| {
5945                let file = File::open(path).unwrap();
5946                let row_number_field = Arc::new(
5947                    Field::new("row_number", ArrowDataType::Int64, false)
5948                        .with_extension_type(RowNumber),
5949                );
5950                let options = ArrowReaderOptions::new()
5951                    .with_virtual_columns(vec![row_number_field])
5952                    .unwrap();
5953                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5954                    .unwrap()
5955                    .with_row_selection(selection)
5956                    .with_batch_size(batch_size)
5957                    .build()
5958                    .expect("Could not create reader");
5959                reader
5960                    .collect::<Result<Vec<_>, _>>()
5961                    .expect("Could not read")
5962            },
5963        );
5964    }
5965
5966    #[test]
5967    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5968        test_row_numbers_with_multiple_row_groups_helper(
5969            true,
5970            |path, selection, row_filter, batch_size| {
5971                let file = File::open(path).unwrap();
5972                let row_number_field = Arc::new(
5973                    Field::new("row_number", ArrowDataType::Int64, false)
5974                        .with_extension_type(RowNumber),
5975                );
5976                let options = ArrowReaderOptions::new()
5977                    .with_virtual_columns(vec![row_number_field])
5978                    .unwrap();
5979                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5980                    .unwrap()
5981                    .with_row_selection(selection)
5982                    .with_batch_size(batch_size)
5983                    .with_row_filter(row_filter.expect("No filter"))
5984                    .build()
5985                    .expect("Could not create reader");
5986                reader
5987                    .collect::<Result<Vec<_>, _>>()
5988                    .expect("Could not read")
5989            },
5990        );
5991    }
5992
5993    #[test]
5994    fn test_read_row_group_indices() {
5995        // create a parquet file with 3 row groups, 2 rows each
5996        let array1 = Int64Array::from(vec![1, 2]);
5997        let array2 = Int64Array::from(vec![3, 4]);
5998        let array3 = Int64Array::from(vec![5, 6]);
5999
6000        let batch1 =
6001            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6002        let batch2 =
6003            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6004        let batch3 =
6005            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
6006
6007        let mut buffer = Vec::new();
6008        let options = WriterProperties::builder()
6009            .set_max_row_group_size(2)
6010            .build();
6011        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6012        writer.write(&batch1).unwrap();
6013        writer.write(&batch2).unwrap();
6014        writer.write(&batch3).unwrap();
6015        writer.close().unwrap();
6016
6017        let file = Bytes::from(buffer);
6018        let row_group_index_field = Arc::new(
6019            Field::new("row_group_index", ArrowDataType::Int64, false)
6020                .with_extension_type(RowGroupIndex),
6021        );
6022
6023        let options = ArrowReaderOptions::new()
6024            .with_virtual_columns(vec![row_group_index_field.clone()])
6025            .unwrap();
6026        let mut arrow_reader =
6027            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
6028                .expect("reader builder with virtual columns")
6029                .build()
6030                .expect("reader with virtual columns");
6031
6032        let batch = arrow_reader.next().unwrap().unwrap();
6033
6034        assert_eq!(batch.num_columns(), 2);
6035        assert_eq!(batch.num_rows(), 6);
6036
6037        assert_eq!(
6038            batch
6039                .column(0)
6040                .as_primitive::<types::Int64Type>()
6041                .iter()
6042                .collect::<Vec<_>>(),
6043            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
6044        );
6045
6046        assert_eq!(
6047            batch
6048                .column(1)
6049                .as_primitive::<types::Int64Type>()
6050                .iter()
6051                .collect::<Vec<_>>(),
6052            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
6053        );
6054    }
6055
6056    #[test]
6057    fn test_read_only_row_group_indices() {
6058        let array1 = Int64Array::from(vec![1, 2, 3]);
6059        let array2 = Int64Array::from(vec![4, 5]);
6060
6061        let batch1 =
6062            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6063        let batch2 =
6064            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6065
6066        let mut buffer = Vec::new();
6067        let options = WriterProperties::builder()
6068            .set_max_row_group_size(3)
6069            .build();
6070        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6071        writer.write(&batch1).unwrap();
6072        writer.write(&batch2).unwrap();
6073        writer.close().unwrap();
6074
6075        let file = Bytes::from(buffer);
6076        let row_group_index_field = Arc::new(
6077            Field::new("row_group_index", ArrowDataType::Int64, false)
6078                .with_extension_type(RowGroupIndex),
6079        );
6080
6081        let options = ArrowReaderOptions::new()
6082            .with_virtual_columns(vec![row_group_index_field.clone()])
6083            .unwrap();
6084        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
6085        let num_columns = metadata
6086            .metadata
6087            .file_metadata()
6088            .schema_descr()
6089            .num_columns();
6090
6091        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
6092            .with_projection(ProjectionMask::none(num_columns))
6093            .build()
6094            .expect("reader with virtual columns only");
6095
6096        let batch = arrow_reader.next().unwrap().unwrap();
6097        let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
6098
6099        assert_eq!(batch.schema(), schema);
6100        assert_eq!(batch.num_columns(), 1);
6101        assert_eq!(batch.num_rows(), 5);
6102
6103        assert_eq!(
6104            batch
6105                .column(0)
6106                .as_primitive::<types::Int64Type>()
6107                .iter()
6108                .collect::<Vec<_>>(),
6109            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
6110        );
6111    }
6112
6113    #[test]
6114    fn test_read_row_group_indices_with_selection() -> Result<()> {
6115        let mut buffer = Vec::new();
6116        let options = WriterProperties::builder()
6117            .set_max_row_group_size(10)
6118            .build();
6119
6120        let schema = Arc::new(Schema::new(vec![Field::new(
6121            "value",
6122            ArrowDataType::Int64,
6123            false,
6124        )]));
6125
6126        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
6127
6128        // write out 3 batches of 10 rows each
6129        for i in 0..3 {
6130            let start = i * 10;
6131            let array = Int64Array::from_iter_values(start..start + 10);
6132            let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
6133            writer.write(&batch)?;
6134        }
6135        writer.close()?;
6136
6137        let file = Bytes::from(buffer);
6138        let row_group_index_field = Arc::new(
6139            Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
6140        );
6141
6142        let options =
6143            ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
6144
6145        // test row groups are read in reverse order
6146        let arrow_reader =
6147            ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
6148                .with_row_groups(vec![2, 1, 0])
6149                .build()?;
6150
6151        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
6152        let combined = concat_batches(&batches[0].schema(), &batches)?;
6153
6154        let values = combined.column(0).as_primitive::<types::Int64Type>();
6155        let first_val = values.value(0);
6156        let last_val = values.value(combined.num_rows() - 1);
6157        // first row from rg 2
6158        assert_eq!(first_val, 20);
6159        // the last row from rg 0
6160        assert_eq!(last_val, 9);
6161
6162        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
6163        assert_eq!(rg_indices.value(0), 2);
6164        assert_eq!(rg_indices.value(10), 1);
6165        assert_eq!(rg_indices.value(20), 0);
6166
6167        Ok(())
6168    }
6169
6170    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
6171        use_filter: bool,
6172        test_case: F,
6173    ) where
6174        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
6175    {
6176        let seed: u64 = random();
6177        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
6178        let mut rng = StdRng::seed_from_u64(seed);
6179
6180        use tempfile::TempDir;
6181        let tempdir = TempDir::new().expect("Could not create temp dir");
6182
6183        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
6184
6185        let path = tempdir.path().join("test.parquet");
6186        std::fs::write(&path, bytes).expect("Could not write file");
6187
6188        let mut case = vec![];
6189        let mut remaining = metadata.file_metadata().num_rows();
6190        while remaining > 0 {
6191            let row_count = rng.random_range(1..=remaining);
6192            remaining -= row_count;
6193            case.push(RowSelector {
6194                row_count: row_count as usize,
6195                skip: rng.random_bool(0.5),
6196            });
6197        }
6198
6199        let filter = use_filter.then(|| {
6200            let filter = (0..metadata.file_metadata().num_rows())
6201                .map(|_| rng.random_bool(0.99))
6202                .collect::<Vec<_>>();
6203            let mut filter_offset = 0;
6204            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
6205                ProjectionMask::all(),
6206                move |b| {
6207                    let array = BooleanArray::from_iter(
6208                        filter
6209                            .iter()
6210                            .skip(filter_offset)
6211                            .take(b.num_rows())
6212                            .map(|x| Some(*x)),
6213                    );
6214                    filter_offset += b.num_rows();
6215                    Ok(array)
6216                },
6217            ))])
6218        });
6219
6220        let selection = RowSelection::from(case);
6221        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6222
6223        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6224            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6225            return;
6226        }
6227        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6228            .expect("Failed to concatenate");
6229        // assert_eq!(selection.row_count(), actual.num_rows());
6230        let values = actual
6231            .column(0)
6232            .as_primitive::<types::Int64Type>()
6233            .iter()
6234            .collect::<Vec<_>>();
6235        let row_numbers = actual
6236            .column(1)
6237            .as_primitive::<types::Int64Type>()
6238            .iter()
6239            .collect::<Vec<_>>();
6240        assert_eq!(
6241            row_numbers
6242                .into_iter()
6243                .map(|number| number.map(|number| number + 1))
6244                .collect::<Vec<_>>(),
6245            values
6246        );
6247    }
6248
6249    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6250        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6251            "value",
6252            ArrowDataType::Int64,
6253            false,
6254        )])));
6255
6256        let mut buf = Vec::with_capacity(1024);
6257        let mut writer =
6258            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6259
6260        let mut values = 1..=rng.random_range(1..4096);
6261        while !values.is_empty() {
6262            let batch_values = values
6263                .by_ref()
6264                .take(rng.random_range(1..4096))
6265                .collect::<Vec<_>>();
6266            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6267            let batch =
6268                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6269            writer.write(&batch).expect("Could not write batch");
6270            writer.flush().expect("Could not flush");
6271        }
6272        let metadata = writer.close().expect("Could not close writer");
6273
6274        (Bytes::from(buf), metadata)
6275    }
6276}