parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    pub fn with_row_filter(self, filter: RowFilter) -> Self {
308        Self {
309            filter: Some(filter),
310            ..self
311        }
312    }
313
314    /// Provide a limit to the number of rows to be read
315    ///
316    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
317    /// allowing it to limit the final set of rows decoded after any pushed down predicates
318    ///
319    /// It is recommended to enable reading the page index if using this functionality, to allow
320    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
321    pub fn with_limit(self, limit: usize) -> Self {
322        Self {
323            limit: Some(limit),
324            ..self
325        }
326    }
327
328    /// Provide an offset to skip over the given number of rows
329    ///
330    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
331    /// allowing it to skip rows after any pushed down predicates
332    ///
333    /// It is recommended to enable reading the page index if using this functionality, to allow
334    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
335    pub fn with_offset(self, offset: usize) -> Self {
336        Self {
337            offset: Some(offset),
338            ..self
339        }
340    }
341
342    /// Specify metrics collection during reading
343    ///
344    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
345    /// clone of the provided metrics to the builder.
346    ///
347    /// For example:
348    ///
349    /// ```rust
350    /// # use std::sync::Arc;
351    /// # use bytes::Bytes;
352    /// # use arrow_array::{Int32Array, RecordBatch};
353    /// # use arrow_schema::{DataType, Field, Schema};
354    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
355    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
356    /// # use parquet::arrow::ArrowWriter;
357    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
358    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
359    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
360    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
361    /// # writer.write(&batch).unwrap();
362    /// # writer.close().unwrap();
363    /// # let file = Bytes::from(file);
364    /// // Create metrics object to pass into the reader
365    /// let metrics = ArrowReaderMetrics::enabled();
366    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
367    ///   // Configure the builder to use the metrics by passing a clone
368    ///   .with_metrics(metrics.clone())
369    ///   // Build the reader
370    ///   .build().unwrap();
371    /// // .. read data from the reader ..
372    ///
373    /// // check the metrics
374    /// assert!(metrics.records_read_from_inner().is_some());
375    /// ```
376    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
377        Self { metrics, ..self }
378    }
379
380    /// Set the maximum size (per row group) of the predicate cache in bytes for
381    /// the async decoder.
382    ///
383    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
384    /// unlimited cache size.
385    ///
386    /// This cache is used to store decoded arrays that are used in
387    /// predicate evaluation ([`Self::with_row_filter`]).
388    ///
389    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
390    /// [this ticket] for more details and alternatives.
391    ///
392    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
393    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
394    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
395        Self {
396            max_predicate_cache_size,
397            ..self
398        }
399    }
400}
401
402/// Options that control how [`ParquetMetaData`] is read when constructing
403/// an Arrow reader.
404///
405/// To use these options, pass them to one of the following methods:
406/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
407/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
408///
409/// For fine-grained control over metadata loading, use
410/// [`ArrowReaderMetadata::load`] to load metadata with these options,
411///
412/// See [`ArrowReaderBuilder`] for how to configure how the column data
413/// is then read from the file, including projection and filter pushdown
414///
415/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
416#[derive(Debug, Clone, Default)]
417pub struct ArrowReaderOptions {
418    /// Should the reader strip any user defined metadata from the Arrow schema
419    skip_arrow_metadata: bool,
420    /// If provided, used as the schema hint when determining the Arrow schema,
421    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
422    ///
423    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
424    supplied_schema: Option<SchemaRef>,
425    /// Policy for reading offset and column indexes.
426    pub(crate) page_index_policy: PageIndexPolicy,
427    /// Options to control reading of Parquet metadata
428    metadata_options: ParquetMetaDataOptions,
429    /// If encryption is enabled, the file decryption properties can be provided
430    #[cfg(feature = "encryption")]
431    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
432
433    virtual_columns: Vec<FieldRef>,
434}
435
436impl ArrowReaderOptions {
437    /// Create a new [`ArrowReaderOptions`] with the default settings
438    pub fn new() -> Self {
439        Self::default()
440    }
441
442    /// Skip decoding the embedded arrow metadata (defaults to `false`)
443    ///
444    /// Parquet files generated by some writers may contain embedded arrow
445    /// schema and metadata.
446    /// This may not be correct or compatible with your system,
447    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
448    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
449        Self {
450            skip_arrow_metadata,
451            ..self
452        }
453    }
454
455    /// Provide a schema hint to use when reading the Parquet file.
456    ///
457    /// If provided, this schema takes precedence over any arrow schema embedded
458    /// in the metadata (see the [`arrow`] documentation for more details).
459    ///
460    /// If the provided schema is not compatible with the data stored in the
461    /// parquet file schema, an error will be returned when constructing the
462    /// builder.
463    ///
464    /// This option is only required if you want to explicitly control the
465    /// conversion of Parquet types to Arrow types, such as casting a column to
466    /// a different type. For example, if you wanted to read an Int64 in
467    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
468    ///
469    /// [`arrow`]: crate::arrow
470    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
471    ///
472    /// # Notes
473    ///
474    /// The provided schema must have the same number of columns as the parquet schema and
475    /// the column names must be the same.
476    ///
477    /// # Example
478    /// ```
479    /// use std::io::Bytes;
480    /// use std::sync::Arc;
481    /// use tempfile::tempfile;
482    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
483    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
484    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
485    /// use parquet::arrow::ArrowWriter;
486    ///
487    /// // Write data - schema is inferred from the data to be Int32
488    /// let file = tempfile().unwrap();
489    /// let batch = RecordBatch::try_from_iter(vec![
490    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
491    /// ]).unwrap();
492    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
493    /// writer.write(&batch).unwrap();
494    /// writer.close().unwrap();
495    ///
496    /// // Read the file back.
497    /// // Supply a schema that interprets the Int32 column as a Timestamp.
498    /// let supplied_schema = Arc::new(Schema::new(vec![
499    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
500    /// ]));
501    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
502    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
503    ///     file.try_clone().unwrap(),
504    ///     options
505    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
506    ///
507    /// // Create the reader and read the data using the supplied schema.
508    /// let mut reader = builder.build().unwrap();
509    /// let _batch = reader.next().unwrap().unwrap();
510    /// ```
511    pub fn with_schema(self, schema: SchemaRef) -> Self {
512        Self {
513            supplied_schema: Some(schema),
514            skip_arrow_metadata: true,
515            ..self
516        }
517    }
518
519    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
520    ///
521    /// The `PageIndex` can be used to push down predicates to the parquet scan,
522    /// potentially eliminating unnecessary IO, by some query engines.
523    ///
524    /// If this is enabled, [`ParquetMetaData::column_index`] and
525    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
526    /// information is present in the file.
527    ///
528    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
529    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
530    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
531    pub fn with_page_index(self, page_index: bool) -> Self {
532        let page_index_policy = PageIndexPolicy::from(page_index);
533
534        Self {
535            page_index_policy,
536            ..self
537        }
538    }
539
540    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
541    ///
542    /// See [`Self::with_page_index`] for more details.
543    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
544        Self {
545            page_index_policy: policy,
546            ..self
547        }
548    }
549
550    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
551    /// footer will be skipped.
552    ///
553    /// This can be used to avoid reparsing the schema from the file when it is
554    /// already known.
555    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
556        self.metadata_options.set_schema(schema);
557        self
558    }
559
560    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
561    /// (defaults to `false`).
562    ///
563    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
564    /// might be desirable.
565    ///
566    /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
567    /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
568    /// [`encoding_stats`]:
569    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
570    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
571        self.metadata_options.set_encoding_stats_as_mask(val);
572        self
573    }
574
575    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
576    ///
577    /// [`encoding_stats`]:
578    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
579    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
580        self.metadata_options.set_encoding_stats_policy(policy);
581        self
582    }
583
584    /// Provide the file decryption properties to use when reading encrypted parquet files.
585    ///
586    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
587    #[cfg(feature = "encryption")]
588    pub fn with_file_decryption_properties(
589        self,
590        file_decryption_properties: Arc<FileDecryptionProperties>,
591    ) -> Self {
592        Self {
593            file_decryption_properties: Some(file_decryption_properties),
594            ..self
595        }
596    }
597
598    /// Include virtual columns in the output.
599    ///
600    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers.
601    ///
602    /// # Example
603    /// ```
604    /// # use std::sync::Arc;
605    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
606    /// # use arrow_schema::{DataType, Field, Schema};
607    /// # use parquet::arrow::{ArrowWriter, RowNumber};
608    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
609    /// # use tempfile::tempfile;
610    /// #
611    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
612    /// // Create a simple record batch with some data
613    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
614    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
615    ///
616    /// // Write the batch to a temporary parquet file
617    /// let file = tempfile()?;
618    /// let mut writer = ArrowWriter::try_new(
619    ///     file.try_clone()?,
620    ///     batch.schema(),
621    ///     None
622    /// )?;
623    /// writer.write(&batch)?;
624    /// writer.close()?;
625    ///
626    /// // Create a virtual column for row numbers
627    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
628    ///     .with_extension_type(RowNumber));
629    ///
630    /// // Configure options with virtual columns
631    /// let options = ArrowReaderOptions::new()
632    ///     .with_virtual_columns(vec![row_number_field])?;
633    ///
634    /// // Create a reader with the options
635    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
636    ///     file,
637    ///     options
638    /// )?
639    /// .build()?;
640    ///
641    /// // Read the batch - it will include both the original column and the virtual row_number column
642    /// let result_batch = reader.next().unwrap()?;
643    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
644    /// assert_eq!(result_batch.num_rows(), 3);
645    /// #
646    /// # Ok(())
647    /// # }
648    /// ```
649    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
650        // Validate that all fields are virtual columns
651        for field in &virtual_columns {
652            if !is_virtual_column(field) {
653                return Err(ParquetError::General(format!(
654                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
655                    field.name()
656                )));
657            }
658        }
659        Ok(Self {
660            virtual_columns,
661            ..self
662        })
663    }
664
665    /// Retrieve the currently set page index behavior.
666    ///
667    /// This can be set via [`with_page_index`][Self::with_page_index].
668    pub fn page_index(&self) -> bool {
669        self.page_index_policy != PageIndexPolicy::Skip
670    }
671
672    /// Retrieve the currently set metadata decoding options.
673    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
674        &self.metadata_options
675    }
676
677    /// Retrieve the currently set file decryption properties.
678    ///
679    /// This can be set via
680    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
681    #[cfg(feature = "encryption")]
682    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
683        self.file_decryption_properties.as_ref()
684    }
685}
686
687/// The metadata necessary to construct a [`ArrowReaderBuilder`]
688///
689/// Note this structure is cheaply clone-able as it consists of several arcs.
690///
691/// This structure allows
692///
693/// 1. Loading metadata for a file once and then using that same metadata to
694///    construct multiple separate readers, for example, to distribute readers
695///    across multiple threads
696///
697/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
698///    from the file each time a reader is constructed.
699///
700/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
701#[derive(Debug, Clone)]
702pub struct ArrowReaderMetadata {
703    /// The Parquet Metadata, if known aprior
704    pub(crate) metadata: Arc<ParquetMetaData>,
705    /// The Arrow Schema
706    pub(crate) schema: SchemaRef,
707    /// The Parquet schema (root field)
708    pub(crate) fields: Option<Arc<ParquetField>>,
709}
710
711impl ArrowReaderMetadata {
712    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
713    /// and [`ChunkReader`]
714    ///
715    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
716    /// example of how this can be used
717    ///
718    /// # Notes
719    ///
720    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
721    /// `Self::metadata` is missing the page index, this function will attempt
722    /// to load the page index by making an object store request.
723    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
724        let metadata = ParquetMetaDataReader::new()
725            .with_page_index_policy(options.page_index_policy)
726            .with_metadata_options(Some(options.metadata_options.clone()));
727        #[cfg(feature = "encryption")]
728        let metadata = metadata.with_decryption_properties(
729            options.file_decryption_properties.as_ref().map(Arc::clone),
730        );
731        let metadata = metadata.parse_and_finish(reader)?;
732        Self::try_new(Arc::new(metadata), options)
733    }
734
735    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
736    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
737    ///
738    /// # Notes
739    ///
740    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
741    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
742    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
743        match options.supplied_schema {
744            Some(supplied_schema) => Self::with_supplied_schema(
745                metadata,
746                supplied_schema.clone(),
747                &options.virtual_columns,
748            ),
749            None => {
750                let kv_metadata = match options.skip_arrow_metadata {
751                    true => None,
752                    false => metadata.file_metadata().key_value_metadata(),
753                };
754
755                let (schema, fields) = parquet_to_arrow_schema_and_fields(
756                    metadata.file_metadata().schema_descr(),
757                    ProjectionMask::all(),
758                    kv_metadata,
759                    &options.virtual_columns,
760                )?;
761
762                Ok(Self {
763                    metadata,
764                    schema: Arc::new(schema),
765                    fields: fields.map(Arc::new),
766                })
767            }
768        }
769    }
770
771    fn with_supplied_schema(
772        metadata: Arc<ParquetMetaData>,
773        supplied_schema: SchemaRef,
774        virtual_columns: &[FieldRef],
775    ) -> Result<Self> {
776        let parquet_schema = metadata.file_metadata().schema_descr();
777        let field_levels = parquet_to_arrow_field_levels_with_virtual(
778            parquet_schema,
779            ProjectionMask::all(),
780            Some(supplied_schema.fields()),
781            virtual_columns,
782        )?;
783        let fields = field_levels.fields;
784        let inferred_len = fields.len();
785        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
786        // Ensure the supplied schema has the same number of columns as the parquet schema.
787        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
788        // different lengths, but we check here to be safe.
789        if inferred_len != supplied_len {
790            return Err(arrow_err!(format!(
791                "Incompatible supplied Arrow schema: expected {} columns received {}",
792                inferred_len, supplied_len
793            )));
794        }
795
796        let mut errors = Vec::new();
797
798        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
799
800        for (field1, field2) in field_iter {
801            if field1.data_type() != field2.data_type() {
802                errors.push(format!(
803                    "data type mismatch for field {}: requested {} but found {}",
804                    field1.name(),
805                    field1.data_type(),
806                    field2.data_type()
807                ));
808            }
809            if field1.is_nullable() != field2.is_nullable() {
810                errors.push(format!(
811                    "nullability mismatch for field {}: expected {:?} but found {:?}",
812                    field1.name(),
813                    field1.is_nullable(),
814                    field2.is_nullable()
815                ));
816            }
817            if field1.metadata() != field2.metadata() {
818                errors.push(format!(
819                    "metadata mismatch for field {}: expected {:?} but found {:?}",
820                    field1.name(),
821                    field1.metadata(),
822                    field2.metadata()
823                ));
824            }
825        }
826
827        if !errors.is_empty() {
828            let message = errors.join(", ");
829            return Err(ParquetError::ArrowError(format!(
830                "Incompatible supplied Arrow schema: {message}",
831            )));
832        }
833
834        Ok(Self {
835            metadata,
836            schema: supplied_schema,
837            fields: field_levels.levels.map(Arc::new),
838        })
839    }
840
841    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
842    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
843        &self.metadata
844    }
845
846    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
847    pub fn parquet_schema(&self) -> &SchemaDescriptor {
848        self.metadata.file_metadata().schema_descr()
849    }
850
851    /// Returns the arrow [`SchemaRef`] for this parquet file
852    pub fn schema(&self) -> &SchemaRef {
853        &self.schema
854    }
855}
856
857#[doc(hidden)]
858// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
859pub struct SyncReader<T: ChunkReader>(T);
860
861impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
862    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
863        f.debug_tuple("SyncReader").field(&self.0).finish()
864    }
865}
866
867/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
868///
869/// # See Also
870/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
871/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
872/// * [`ArrowReaderBuilder`] for additional member functions
873pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
874
875impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
876    /// Create a new [`ParquetRecordBatchReaderBuilder`]
877    ///
878    /// ```
879    /// # use std::sync::Arc;
880    /// # use bytes::Bytes;
881    /// # use arrow_array::{Int32Array, RecordBatch};
882    /// # use arrow_schema::{DataType, Field, Schema};
883    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
884    /// # use parquet::arrow::ArrowWriter;
885    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
886    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
887    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
888    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
889    /// # writer.write(&batch).unwrap();
890    /// # writer.close().unwrap();
891    /// # let file = Bytes::from(file);
892    /// #
893    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
894    ///
895    /// // Inspect metadata
896    /// assert_eq!(builder.metadata().num_row_groups(), 1);
897    ///
898    /// // Construct reader
899    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
900    ///
901    /// // Read data
902    /// let _batch = reader.next().unwrap().unwrap();
903    /// ```
904    pub fn try_new(reader: T) -> Result<Self> {
905        Self::try_new_with_options(reader, Default::default())
906    }
907
908    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
909    ///
910    /// Use this method if you want to control the options for reading the
911    /// [`ParquetMetaData`]
912    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
913        let metadata = ArrowReaderMetadata::load(&reader, options)?;
914        Ok(Self::new_with_metadata(reader, metadata))
915    }
916
917    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
918    ///
919    /// Use this method if you already have [`ParquetMetaData`] for a file.
920    /// This interface allows:
921    ///
922    /// 1. Loading metadata once and using it to create multiple builders with
923    ///    potentially different settings or run on different threads
924    ///
925    /// 2. Using a cached copy of the metadata rather than re-reading it from the
926    ///    file each time a reader is constructed.
927    ///
928    /// See the docs on [`ArrowReaderMetadata`] for more details
929    ///
930    /// # Example
931    /// ```
932    /// # use std::fs::metadata;
933    /// # use std::sync::Arc;
934    /// # use bytes::Bytes;
935    /// # use arrow_array::{Int32Array, RecordBatch};
936    /// # use arrow_schema::{DataType, Field, Schema};
937    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
938    /// # use parquet::arrow::ArrowWriter;
939    /// #
940    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
941    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
942    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
943    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
944    /// # writer.write(&batch).unwrap();
945    /// # writer.close().unwrap();
946    /// # let file = Bytes::from(file);
947    /// #
948    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
949    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
950    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
951    ///
952    /// // Should be able to read from both in parallel
953    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
954    /// ```
955    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
956        Self::new_builder(SyncReader(input), metadata)
957    }
958
959    /// Read bloom filter for a column in a row group
960    ///
961    /// Returns `None` if the column does not have a bloom filter
962    ///
963    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
964    pub fn get_row_group_column_bloom_filter(
965        &self,
966        row_group_idx: usize,
967        column_idx: usize,
968    ) -> Result<Option<Sbbf>> {
969        let metadata = self.metadata.row_group(row_group_idx);
970        let column_metadata = metadata.column(column_idx);
971
972        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
973            offset
974                .try_into()
975                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
976        } else {
977            return Ok(None);
978        };
979
980        let buffer = match column_metadata.bloom_filter_length() {
981            Some(length) => self.input.0.get_bytes(offset, length as usize),
982            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
983        }?;
984
985        let (header, bitset_offset) =
986            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
987
988        match header.algorithm {
989            BloomFilterAlgorithm::BLOCK => {
990                // this match exists to future proof the singleton algorithm enum
991            }
992        }
993        match header.compression {
994            BloomFilterCompression::UNCOMPRESSED => {
995                // this match exists to future proof the singleton compression enum
996            }
997        }
998        match header.hash {
999            BloomFilterHash::XXHASH => {
1000                // this match exists to future proof the singleton hash enum
1001            }
1002        }
1003
1004        let bitset = match column_metadata.bloom_filter_length() {
1005            Some(_) => buffer.slice(
1006                (TryInto::<usize>::try_into(bitset_offset).unwrap()
1007                    - TryInto::<usize>::try_into(offset).unwrap())..,
1008            ),
1009            None => {
1010                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1011                    ParquetError::General("Bloom filter length is invalid".to_string())
1012                })?;
1013                self.input.0.get_bytes(bitset_offset, bitset_length)?
1014            }
1015        };
1016        Ok(Some(Sbbf::new(&bitset)))
1017    }
1018
1019    /// Build a [`ParquetRecordBatchReader`]
1020    ///
1021    /// Note: this will eagerly evaluate any `RowFilter` before returning
1022    pub fn build(self) -> Result<ParquetRecordBatchReader> {
1023        let Self {
1024            input,
1025            metadata,
1026            schema: _,
1027            fields,
1028            batch_size,
1029            row_groups,
1030            projection,
1031            mut filter,
1032            selection,
1033            row_selection_policy,
1034            limit,
1035            offset,
1036            metrics,
1037            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1038            max_predicate_cache_size: _,
1039        } = self;
1040
1041        // Try to avoid allocate large buffer
1042        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1043
1044        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1045
1046        let reader = ReaderRowGroups {
1047            reader: Arc::new(input.0),
1048            metadata,
1049            row_groups,
1050        };
1051
1052        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1053            .with_selection(selection)
1054            .with_row_selection_policy(row_selection_policy);
1055
1056        // Update selection based on any filters
1057        if let Some(filter) = filter.as_mut() {
1058            for predicate in filter.predicates.iter_mut() {
1059                // break early if we have ruled out all rows
1060                if !plan_builder.selects_any() {
1061                    break;
1062                }
1063
1064                let mut cache_projection = predicate.projection().clone();
1065                cache_projection.intersect(&projection);
1066
1067                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1068                    .with_parquet_metadata(&reader.metadata)
1069                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1070
1071                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1072            }
1073        }
1074
1075        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1076            .with_parquet_metadata(&reader.metadata)
1077            .build_array_reader(fields.as_deref(), &projection)?;
1078
1079        let read_plan = plan_builder
1080            .limited(reader.num_rows())
1081            .with_offset(offset)
1082            .with_limit(limit)
1083            .build_limited()
1084            .build();
1085
1086        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1087    }
1088}
1089
1090struct ReaderRowGroups<T: ChunkReader> {
1091    reader: Arc<T>,
1092
1093    metadata: Arc<ParquetMetaData>,
1094    /// Optional list of row group indices to scan
1095    row_groups: Vec<usize>,
1096}
1097
1098impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1099    fn num_rows(&self) -> usize {
1100        let meta = self.metadata.row_groups();
1101        self.row_groups
1102            .iter()
1103            .map(|x| meta[*x].num_rows() as usize)
1104            .sum()
1105    }
1106
1107    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1108        Ok(Box::new(ReaderPageIterator {
1109            column_idx: i,
1110            reader: self.reader.clone(),
1111            metadata: self.metadata.clone(),
1112            row_groups: self.row_groups.clone().into_iter(),
1113        }))
1114    }
1115
1116    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1117        Box::new(
1118            self.row_groups
1119                .iter()
1120                .map(move |i| self.metadata.row_group(*i)),
1121        )
1122    }
1123
1124    fn metadata(&self) -> &ParquetMetaData {
1125        self.metadata.as_ref()
1126    }
1127}
1128
1129struct ReaderPageIterator<T: ChunkReader> {
1130    reader: Arc<T>,
1131    column_idx: usize,
1132    row_groups: std::vec::IntoIter<usize>,
1133    metadata: Arc<ParquetMetaData>,
1134}
1135
1136impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1137    /// Return the next SerializedPageReader
1138    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1139        let rg = self.metadata.row_group(rg_idx);
1140        let column_chunk_metadata = rg.column(self.column_idx);
1141        let offset_index = self.metadata.offset_index();
1142        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1143        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1144        let page_locations = offset_index
1145            .filter(|i| !i[rg_idx].is_empty())
1146            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1147        let total_rows = rg.num_rows() as usize;
1148        let reader = self.reader.clone();
1149
1150        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1151            .add_crypto_context(
1152                rg_idx,
1153                self.column_idx,
1154                self.metadata.as_ref(),
1155                column_chunk_metadata,
1156            )
1157    }
1158}
1159
1160impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1161    type Item = Result<Box<dyn PageReader>>;
1162
1163    fn next(&mut self) -> Option<Self::Item> {
1164        let rg_idx = self.row_groups.next()?;
1165        let page_reader = self
1166            .next_page_reader(rg_idx)
1167            .map(|page_reader| Box::new(page_reader) as _);
1168        Some(page_reader)
1169    }
1170}
1171
1172impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1173
1174/// Reads Parquet data as Arrow [`RecordBatch`]es
1175///
1176/// This struct implements the [`RecordBatchReader`] trait and is an
1177/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1178///
1179/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1180///
1181/// Created by [`ParquetRecordBatchReaderBuilder`]
1182///
1183/// [`Bytes`]: bytes::Bytes
1184pub struct ParquetRecordBatchReader {
1185    array_reader: Box<dyn ArrayReader>,
1186    schema: SchemaRef,
1187    read_plan: ReadPlan,
1188}
1189
1190impl Debug for ParquetRecordBatchReader {
1191    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1192        f.debug_struct("ParquetRecordBatchReader")
1193            .field("array_reader", &"...")
1194            .field("schema", &self.schema)
1195            .field("read_plan", &self.read_plan)
1196            .finish()
1197    }
1198}
1199
1200impl Iterator for ParquetRecordBatchReader {
1201    type Item = Result<RecordBatch, ArrowError>;
1202
1203    fn next(&mut self) -> Option<Self::Item> {
1204        self.next_inner()
1205            .map_err(|arrow_err| arrow_err.into())
1206            .transpose()
1207    }
1208}
1209
1210impl ParquetRecordBatchReader {
1211    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1212    /// has reached the end of the file.
1213    ///
1214    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1215    /// simplify error handling with `?`
1216    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1217        let mut read_records = 0;
1218        let batch_size = self.batch_size();
1219        if batch_size == 0 {
1220            return Ok(None);
1221        }
1222        match self.read_plan.row_selection_cursor_mut() {
1223            RowSelectionCursor::Mask(mask_cursor) => {
1224                // Stream the record batch reader using contiguous segments of the selection
1225                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1226                while !mask_cursor.is_empty() {
1227                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1228                        return Ok(None);
1229                    };
1230
1231                    if mask_chunk.initial_skip > 0 {
1232                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1233                        if skipped != mask_chunk.initial_skip {
1234                            return Err(general_err!(
1235                                "failed to skip rows, expected {}, got {}",
1236                                mask_chunk.initial_skip,
1237                                skipped
1238                            ));
1239                        }
1240                    }
1241
1242                    if mask_chunk.chunk_rows == 0 {
1243                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1244                            return Ok(None);
1245                        }
1246                        continue;
1247                    }
1248
1249                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1250
1251                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1252                    if read == 0 {
1253                        return Err(general_err!(
1254                            "reached end of column while expecting {} rows",
1255                            mask_chunk.chunk_rows
1256                        ));
1257                    }
1258                    if read != mask_chunk.chunk_rows {
1259                        return Err(general_err!(
1260                            "insufficient rows read from array reader - expected {}, got {}",
1261                            mask_chunk.chunk_rows,
1262                            read
1263                        ));
1264                    }
1265
1266                    let array = self.array_reader.consume_batch()?;
1267                    // The column reader exposes the projection as a struct array; convert this
1268                    // into a record batch before applying the boolean filter mask.
1269                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1270                        ArrowError::ParquetError(
1271                            "Struct array reader should return struct array".to_string(),
1272                        )
1273                    })?;
1274
1275                    let filtered_batch =
1276                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1277
1278                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1279                        return Err(general_err!(
1280                            "filtered rows mismatch selection - expected {}, got {}",
1281                            mask_chunk.selected_rows,
1282                            filtered_batch.num_rows()
1283                        ));
1284                    }
1285
1286                    if filtered_batch.num_rows() == 0 {
1287                        continue;
1288                    }
1289
1290                    return Ok(Some(filtered_batch));
1291                }
1292            }
1293            RowSelectionCursor::Selectors(selectors_cursor) => {
1294                while read_records < batch_size && !selectors_cursor.is_empty() {
1295                    let front = selectors_cursor.next_selector();
1296                    if front.skip {
1297                        let skipped = self.array_reader.skip_records(front.row_count)?;
1298
1299                        if skipped != front.row_count {
1300                            return Err(general_err!(
1301                                "failed to skip rows, expected {}, got {}",
1302                                front.row_count,
1303                                skipped
1304                            ));
1305                        }
1306                        continue;
1307                    }
1308
1309                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1310                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1311                    if front.row_count == 0 {
1312                        continue;
1313                    }
1314
1315                    // try to read record
1316                    let need_read = batch_size - read_records;
1317                    let to_read = match front.row_count.checked_sub(need_read) {
1318                        Some(remaining) if remaining != 0 => {
1319                            // if page row count less than batch_size we must set batch size to page row count.
1320                            // add check avoid dead loop
1321                            selectors_cursor.return_selector(RowSelector::select(remaining));
1322                            need_read
1323                        }
1324                        _ => front.row_count,
1325                    };
1326                    match self.array_reader.read_records(to_read)? {
1327                        0 => break,
1328                        rec => read_records += rec,
1329                    };
1330                }
1331            }
1332            RowSelectionCursor::All => {
1333                self.array_reader.read_records(batch_size)?;
1334            }
1335        };
1336
1337        let array = self.array_reader.consume_batch()?;
1338        let struct_array = array.as_struct_opt().ok_or_else(|| {
1339            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1340        })?;
1341
1342        Ok(if struct_array.len() > 0 {
1343            Some(RecordBatch::from(struct_array))
1344        } else {
1345            None
1346        })
1347    }
1348}
1349
1350impl RecordBatchReader for ParquetRecordBatchReader {
1351    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1352    ///
1353    /// Note that the schema metadata will be stripped here. See
1354    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1355    fn schema(&self) -> SchemaRef {
1356        self.schema.clone()
1357    }
1358}
1359
1360impl ParquetRecordBatchReader {
1361    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1362    ///
1363    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1364    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1365        ParquetRecordBatchReaderBuilder::try_new(reader)?
1366            .with_batch_size(batch_size)
1367            .build()
1368    }
1369
1370    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1371    ///
1372    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1373    /// higher-level interface for reading parquet data from a file
1374    pub fn try_new_with_row_groups(
1375        levels: &FieldLevels,
1376        row_groups: &dyn RowGroups,
1377        batch_size: usize,
1378        selection: Option<RowSelection>,
1379    ) -> Result<Self> {
1380        // note metrics are not supported in this API
1381        let metrics = ArrowReaderMetrics::disabled();
1382        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1383            .with_parquet_metadata(row_groups.metadata())
1384            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1385
1386        let read_plan = ReadPlanBuilder::new(batch_size)
1387            .with_selection(selection)
1388            .build();
1389
1390        Ok(Self {
1391            array_reader,
1392            schema: Arc::new(Schema::new(levels.fields.clone())),
1393            read_plan,
1394        })
1395    }
1396
1397    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1398    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1399    /// all rows will be returned
1400    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1401        let schema = match array_reader.get_data_type() {
1402            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1403            _ => unreachable!("Struct array reader's data type is not struct!"),
1404        };
1405
1406        Self {
1407            array_reader,
1408            schema: Arc::new(schema),
1409            read_plan,
1410        }
1411    }
1412
1413    #[inline(always)]
1414    pub(crate) fn batch_size(&self) -> usize {
1415        self.read_plan.batch_size()
1416    }
1417}
1418
1419#[cfg(test)]
1420pub(crate) mod tests {
1421    use std::cmp::min;
1422    use std::collections::{HashMap, VecDeque};
1423    use std::fmt::Formatter;
1424    use std::fs::File;
1425    use std::io::Seek;
1426    use std::path::PathBuf;
1427    use std::sync::Arc;
1428
1429    use rand::rngs::StdRng;
1430    use rand::{Rng, RngCore, SeedableRng, random, rng};
1431    use tempfile::tempfile;
1432
1433    use crate::arrow::arrow_reader::{
1434        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1435        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1436        RowSelectionPolicy, RowSelector,
1437    };
1438    use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber};
1439    use crate::arrow::{ArrowWriter, ProjectionMask};
1440    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1441    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1442    use crate::data_type::{
1443        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1444        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1445    };
1446    use crate::errors::Result;
1447    use crate::file::metadata::{ParquetMetaData, ParquetStatisticsPolicy};
1448    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1449    use crate::file::writer::SerializedFileWriter;
1450    use crate::schema::parser::parse_message_type;
1451    use crate::schema::types::{Type, TypePtr};
1452    use crate::util::test_common::rand_gen::RandGen;
1453    use arrow::compute::kernels::cmp::eq;
1454    use arrow::compute::or;
1455    use arrow_array::builder::*;
1456    use arrow_array::cast::AsArray;
1457    use arrow_array::types::{
1458        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1459        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1460        Time64MicrosecondType,
1461    };
1462    use arrow_array::*;
1463    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1464    use arrow_data::{ArrayData, ArrayDataBuilder};
1465    use arrow_schema::{
1466        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1467    };
1468    use arrow_select::concat::concat_batches;
1469    use bytes::Bytes;
1470    use half::f16;
1471    use num_traits::PrimInt;
1472
1473    #[test]
1474    fn test_arrow_reader_all_columns() {
1475        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1476
1477        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1478        let original_schema = Arc::clone(builder.schema());
1479        let reader = builder.build().unwrap();
1480
1481        // Verify that the schema was correctly parsed
1482        assert_eq!(original_schema.fields(), reader.schema().fields());
1483    }
1484
1485    #[test]
1486    fn test_reuse_schema() {
1487        let file = get_test_file("parquet/alltypes-java.parquet");
1488
1489        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1490        let expected = builder.metadata;
1491        let schema = expected.file_metadata().schema_descr_ptr();
1492
1493        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1494        let builder =
1495            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1496
1497        // Verify that the metadata matches
1498        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1499    }
1500
1501    #[test]
1502    fn test_page_encoding_stats_mask() {
1503        let testdata = arrow::util::test_util::parquet_test_data();
1504        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1505        let file = File::open(path).unwrap();
1506
1507        let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1508        let builder =
1509            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1510
1511        let row_group_metadata = builder.metadata.row_group(0);
1512
1513        // test page encoding stats
1514        let page_encoding_stats = row_group_metadata
1515            .column(0)
1516            .page_encoding_stats_mask()
1517            .unwrap();
1518        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1519        let page_encoding_stats = row_group_metadata
1520            .column(2)
1521            .page_encoding_stats_mask()
1522            .unwrap();
1523        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1524    }
1525
1526    #[test]
1527    fn test_page_encoding_stats_skipped() {
1528        let testdata = arrow::util::test_util::parquet_test_data();
1529        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1530        let file = File::open(path).unwrap();
1531
1532        // test skipping all
1533        let arrow_options =
1534            ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
1535        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1536            file.try_clone().unwrap(),
1537            arrow_options,
1538        )
1539        .unwrap();
1540
1541        let row_group_metadata = builder.metadata.row_group(0);
1542        for column in row_group_metadata.columns() {
1543            assert!(column.page_encoding_stats().is_none());
1544            assert!(column.page_encoding_stats_mask().is_none());
1545        }
1546
1547        // test skipping all but one column and converting to mask
1548        let arrow_options = ArrowReaderOptions::new()
1549            .with_encoding_stats_as_mask(true)
1550            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1551        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1552            file.try_clone().unwrap(),
1553            arrow_options,
1554        )
1555        .unwrap();
1556
1557        let row_group_metadata = builder.metadata.row_group(0);
1558        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1559            assert!(column.page_encoding_stats().is_none());
1560            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1561        }
1562    }
1563
1564    #[test]
1565    fn test_arrow_reader_single_column() {
1566        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1567
1568        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1569        let original_schema = Arc::clone(builder.schema());
1570
1571        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1572        let reader = builder.with_projection(mask).build().unwrap();
1573
1574        // Verify that the schema was correctly parsed
1575        assert_eq!(1, reader.schema().fields().len());
1576        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1577    }
1578
1579    #[test]
1580    fn test_arrow_reader_single_column_by_name() {
1581        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1582
1583        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1584        let original_schema = Arc::clone(builder.schema());
1585
1586        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1587        let reader = builder.with_projection(mask).build().unwrap();
1588
1589        // Verify that the schema was correctly parsed
1590        assert_eq!(1, reader.schema().fields().len());
1591        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1592    }
1593
1594    #[test]
1595    fn test_null_column_reader_test() {
1596        let mut file = tempfile::tempfile().unwrap();
1597
1598        let schema = "
1599            message message {
1600                OPTIONAL INT32 int32;
1601            }
1602        ";
1603        let schema = Arc::new(parse_message_type(schema).unwrap());
1604
1605        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1606        generate_single_column_file_with_data::<Int32Type>(
1607            &[vec![], vec![]],
1608            Some(&def_levels),
1609            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1610            schema,
1611            Some(Field::new("int32", ArrowDataType::Null, true)),
1612            &Default::default(),
1613        )
1614        .unwrap();
1615
1616        file.rewind().unwrap();
1617
1618        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1619        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1620
1621        assert_eq!(batches.len(), 4);
1622        for batch in &batches[0..3] {
1623            assert_eq!(batch.num_rows(), 2);
1624            assert_eq!(batch.num_columns(), 1);
1625            assert_eq!(batch.column(0).null_count(), 2);
1626        }
1627
1628        assert_eq!(batches[3].num_rows(), 1);
1629        assert_eq!(batches[3].num_columns(), 1);
1630        assert_eq!(batches[3].column(0).null_count(), 1);
1631    }
1632
1633    #[test]
1634    fn test_primitive_single_column_reader_test() {
1635        run_single_column_reader_tests::<BoolType, _, BoolType>(
1636            2,
1637            ConvertedType::NONE,
1638            None,
1639            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1640            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1641        );
1642        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1643            2,
1644            ConvertedType::NONE,
1645            None,
1646            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1647            &[
1648                Encoding::PLAIN,
1649                Encoding::RLE_DICTIONARY,
1650                Encoding::DELTA_BINARY_PACKED,
1651                Encoding::BYTE_STREAM_SPLIT,
1652            ],
1653        );
1654        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1655            2,
1656            ConvertedType::NONE,
1657            None,
1658            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1659            &[
1660                Encoding::PLAIN,
1661                Encoding::RLE_DICTIONARY,
1662                Encoding::DELTA_BINARY_PACKED,
1663                Encoding::BYTE_STREAM_SPLIT,
1664            ],
1665        );
1666        run_single_column_reader_tests::<FloatType, _, FloatType>(
1667            2,
1668            ConvertedType::NONE,
1669            None,
1670            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1671            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1672        );
1673    }
1674
1675    #[test]
1676    fn test_unsigned_primitive_single_column_reader_test() {
1677        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1678            2,
1679            ConvertedType::UINT_32,
1680            Some(ArrowDataType::UInt32),
1681            |vals| {
1682                Arc::new(UInt32Array::from_iter(
1683                    vals.iter().map(|x| x.map(|x| x as u32)),
1684                ))
1685            },
1686            &[
1687                Encoding::PLAIN,
1688                Encoding::RLE_DICTIONARY,
1689                Encoding::DELTA_BINARY_PACKED,
1690            ],
1691        );
1692        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1693            2,
1694            ConvertedType::UINT_64,
1695            Some(ArrowDataType::UInt64),
1696            |vals| {
1697                Arc::new(UInt64Array::from_iter(
1698                    vals.iter().map(|x| x.map(|x| x as u64)),
1699                ))
1700            },
1701            &[
1702                Encoding::PLAIN,
1703                Encoding::RLE_DICTIONARY,
1704                Encoding::DELTA_BINARY_PACKED,
1705            ],
1706        );
1707    }
1708
1709    #[test]
1710    fn test_unsigned_roundtrip() {
1711        let schema = Arc::new(Schema::new(vec![
1712            Field::new("uint32", ArrowDataType::UInt32, true),
1713            Field::new("uint64", ArrowDataType::UInt64, true),
1714        ]));
1715
1716        let mut buf = Vec::with_capacity(1024);
1717        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1718
1719        let original = RecordBatch::try_new(
1720            schema,
1721            vec![
1722                Arc::new(UInt32Array::from_iter_values([
1723                    0,
1724                    i32::MAX as u32,
1725                    u32::MAX,
1726                ])),
1727                Arc::new(UInt64Array::from_iter_values([
1728                    0,
1729                    i64::MAX as u64,
1730                    u64::MAX,
1731                ])),
1732            ],
1733        )
1734        .unwrap();
1735
1736        writer.write(&original).unwrap();
1737        writer.close().unwrap();
1738
1739        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1740        let ret = reader.next().unwrap().unwrap();
1741        assert_eq!(ret, original);
1742
1743        // Check they can be downcast to the correct type
1744        ret.column(0)
1745            .as_any()
1746            .downcast_ref::<UInt32Array>()
1747            .unwrap();
1748
1749        ret.column(1)
1750            .as_any()
1751            .downcast_ref::<UInt64Array>()
1752            .unwrap();
1753    }
1754
1755    #[test]
1756    fn test_float16_roundtrip() -> Result<()> {
1757        let schema = Arc::new(Schema::new(vec![
1758            Field::new("float16", ArrowDataType::Float16, false),
1759            Field::new("float16-nullable", ArrowDataType::Float16, true),
1760        ]));
1761
1762        let mut buf = Vec::with_capacity(1024);
1763        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1764
1765        let original = RecordBatch::try_new(
1766            schema,
1767            vec![
1768                Arc::new(Float16Array::from_iter_values([
1769                    f16::EPSILON,
1770                    f16::MIN,
1771                    f16::MAX,
1772                    f16::NAN,
1773                    f16::INFINITY,
1774                    f16::NEG_INFINITY,
1775                    f16::ONE,
1776                    f16::NEG_ONE,
1777                    f16::ZERO,
1778                    f16::NEG_ZERO,
1779                    f16::E,
1780                    f16::PI,
1781                    f16::FRAC_1_PI,
1782                ])),
1783                Arc::new(Float16Array::from(vec![
1784                    None,
1785                    None,
1786                    None,
1787                    Some(f16::NAN),
1788                    Some(f16::INFINITY),
1789                    Some(f16::NEG_INFINITY),
1790                    None,
1791                    None,
1792                    None,
1793                    None,
1794                    None,
1795                    None,
1796                    Some(f16::FRAC_1_PI),
1797                ])),
1798            ],
1799        )?;
1800
1801        writer.write(&original)?;
1802        writer.close()?;
1803
1804        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1805        let ret = reader.next().unwrap()?;
1806        assert_eq!(ret, original);
1807
1808        // Ensure can be downcast to the correct type
1809        ret.column(0).as_primitive::<Float16Type>();
1810        ret.column(1).as_primitive::<Float16Type>();
1811
1812        Ok(())
1813    }
1814
1815    #[test]
1816    fn test_time_utc_roundtrip() -> Result<()> {
1817        let schema = Arc::new(Schema::new(vec![
1818            Field::new(
1819                "time_millis",
1820                ArrowDataType::Time32(TimeUnit::Millisecond),
1821                true,
1822            )
1823            .with_metadata(HashMap::from_iter(vec![(
1824                "adjusted_to_utc".to_string(),
1825                "".to_string(),
1826            )])),
1827            Field::new(
1828                "time_micros",
1829                ArrowDataType::Time64(TimeUnit::Microsecond),
1830                true,
1831            )
1832            .with_metadata(HashMap::from_iter(vec![(
1833                "adjusted_to_utc".to_string(),
1834                "".to_string(),
1835            )])),
1836        ]));
1837
1838        let mut buf = Vec::with_capacity(1024);
1839        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1840
1841        let original = RecordBatch::try_new(
1842            schema,
1843            vec![
1844                Arc::new(Time32MillisecondArray::from(vec![
1845                    Some(-1),
1846                    Some(0),
1847                    Some(86_399_000),
1848                    Some(86_400_000),
1849                    Some(86_401_000),
1850                    None,
1851                ])),
1852                Arc::new(Time64MicrosecondArray::from(vec![
1853                    Some(-1),
1854                    Some(0),
1855                    Some(86_399 * 1_000_000),
1856                    Some(86_400 * 1_000_000),
1857                    Some(86_401 * 1_000_000),
1858                    None,
1859                ])),
1860            ],
1861        )?;
1862
1863        writer.write(&original)?;
1864        writer.close()?;
1865
1866        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1867        let ret = reader.next().unwrap()?;
1868        assert_eq!(ret, original);
1869
1870        // Ensure can be downcast to the correct type
1871        ret.column(0).as_primitive::<Time32MillisecondType>();
1872        ret.column(1).as_primitive::<Time64MicrosecondType>();
1873
1874        Ok(())
1875    }
1876
1877    #[test]
1878    fn test_date32_roundtrip() -> Result<()> {
1879        use arrow_array::Date32Array;
1880
1881        let schema = Arc::new(Schema::new(vec![Field::new(
1882            "date32",
1883            ArrowDataType::Date32,
1884            false,
1885        )]));
1886
1887        let mut buf = Vec::with_capacity(1024);
1888
1889        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1890
1891        let original = RecordBatch::try_new(
1892            schema,
1893            vec![Arc::new(Date32Array::from(vec![
1894                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1895            ]))],
1896        )?;
1897
1898        writer.write(&original)?;
1899        writer.close()?;
1900
1901        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1902        let ret = reader.next().unwrap()?;
1903        assert_eq!(ret, original);
1904
1905        // Ensure can be downcast to the correct type
1906        ret.column(0).as_primitive::<Date32Type>();
1907
1908        Ok(())
1909    }
1910
1911    #[test]
1912    fn test_date64_roundtrip() -> Result<()> {
1913        use arrow_array::Date64Array;
1914
1915        let schema = Arc::new(Schema::new(vec![
1916            Field::new("small-date64", ArrowDataType::Date64, false),
1917            Field::new("big-date64", ArrowDataType::Date64, false),
1918            Field::new("invalid-date64", ArrowDataType::Date64, false),
1919        ]));
1920
1921        let mut default_buf = Vec::with_capacity(1024);
1922        let mut coerce_buf = Vec::with_capacity(1024);
1923
1924        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1925
1926        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1927        let mut coerce_writer =
1928            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1929
1930        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1931
1932        let original = RecordBatch::try_new(
1933            schema,
1934            vec![
1935                // small-date64
1936                Arc::new(Date64Array::from(vec![
1937                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1938                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1939                    0,
1940                    1_000 * NUM_MILLISECONDS_IN_DAY,
1941                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1942                ])),
1943                // big-date64
1944                Arc::new(Date64Array::from(vec![
1945                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1946                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1947                    0,
1948                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1949                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1950                ])),
1951                // invalid-date64
1952                Arc::new(Date64Array::from(vec![
1953                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1954                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1955                    1,
1956                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1957                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1958                ])),
1959            ],
1960        )?;
1961
1962        default_writer.write(&original)?;
1963        coerce_writer.write(&original)?;
1964
1965        default_writer.close()?;
1966        coerce_writer.close()?;
1967
1968        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1969        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1970
1971        let default_ret = default_reader.next().unwrap()?;
1972        let coerce_ret = coerce_reader.next().unwrap()?;
1973
1974        // Roundtrip should be successful when default writer used
1975        assert_eq!(default_ret, original);
1976
1977        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1978        assert_eq!(coerce_ret.column(0), original.column(0));
1979        assert_ne!(coerce_ret.column(1), original.column(1));
1980        assert_ne!(coerce_ret.column(2), original.column(2));
1981
1982        // Ensure both can be downcast to the correct type
1983        default_ret.column(0).as_primitive::<Date64Type>();
1984        coerce_ret.column(0).as_primitive::<Date64Type>();
1985
1986        Ok(())
1987    }
1988    struct RandFixedLenGen {}
1989
1990    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1991        fn r#gen(len: i32) -> FixedLenByteArray {
1992            let mut v = vec![0u8; len as usize];
1993            rng().fill_bytes(&mut v);
1994            ByteArray::from(v).into()
1995        }
1996    }
1997
1998    #[test]
1999    fn test_fixed_length_binary_column_reader() {
2000        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2001            20,
2002            ConvertedType::NONE,
2003            None,
2004            |vals| {
2005                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2006                for val in vals {
2007                    match val {
2008                        Some(b) => builder.append_value(b).unwrap(),
2009                        None => builder.append_null(),
2010                    }
2011                }
2012                Arc::new(builder.finish())
2013            },
2014            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2015        );
2016    }
2017
2018    #[test]
2019    fn test_interval_day_time_column_reader() {
2020        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2021            12,
2022            ConvertedType::INTERVAL,
2023            None,
2024            |vals| {
2025                Arc::new(
2026                    vals.iter()
2027                        .map(|x| {
2028                            x.as_ref().map(|b| IntervalDayTime {
2029                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2030                                milliseconds: i32::from_le_bytes(
2031                                    b.as_ref()[8..12].try_into().unwrap(),
2032                                ),
2033                            })
2034                        })
2035                        .collect::<IntervalDayTimeArray>(),
2036                )
2037            },
2038            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2039        );
2040    }
2041
2042    #[test]
2043    fn test_int96_single_column_reader_test() {
2044        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2045
2046        type TypeHintAndConversionFunction =
2047            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2048
2049        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2050            // Test without a specified ArrowType hint.
2051            (None, |vals: &[Option<Int96>]| {
2052                Arc::new(TimestampNanosecondArray::from_iter(
2053                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
2054                )) as ArrayRef
2055            }),
2056            // Test other TimeUnits as ArrowType hints.
2057            (
2058                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2059                |vals: &[Option<Int96>]| {
2060                    Arc::new(TimestampSecondArray::from_iter(
2061                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
2062                    )) as ArrayRef
2063                },
2064            ),
2065            (
2066                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2067                |vals: &[Option<Int96>]| {
2068                    Arc::new(TimestampMillisecondArray::from_iter(
2069                        vals.iter().map(|x| x.map(|x| x.to_millis())),
2070                    )) as ArrayRef
2071                },
2072            ),
2073            (
2074                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2075                |vals: &[Option<Int96>]| {
2076                    Arc::new(TimestampMicrosecondArray::from_iter(
2077                        vals.iter().map(|x| x.map(|x| x.to_micros())),
2078                    )) as ArrayRef
2079                },
2080            ),
2081            (
2082                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2083                |vals: &[Option<Int96>]| {
2084                    Arc::new(TimestampNanosecondArray::from_iter(
2085                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
2086                    )) as ArrayRef
2087                },
2088            ),
2089            // Test another timezone with TimeUnit as ArrowType hints.
2090            (
2091                Some(ArrowDataType::Timestamp(
2092                    TimeUnit::Second,
2093                    Some(Arc::from("-05:00")),
2094                )),
2095                |vals: &[Option<Int96>]| {
2096                    Arc::new(
2097                        TimestampSecondArray::from_iter(
2098                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2099                        )
2100                        .with_timezone("-05:00"),
2101                    ) as ArrayRef
2102                },
2103            ),
2104        ];
2105
2106        resolutions.iter().for_each(|(arrow_type, converter)| {
2107            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2108                2,
2109                ConvertedType::NONE,
2110                arrow_type.clone(),
2111                converter,
2112                encodings,
2113            );
2114        })
2115    }
2116
2117    #[test]
2118    fn test_int96_from_spark_file_with_provided_schema() {
2119        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2120        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2121        // microsecond resolution for the Parquet reader to interpret these values correctly.
2122        use arrow_schema::DataType::Timestamp;
2123        let test_data = arrow::util::test_util::parquet_test_data();
2124        let path = format!("{test_data}/int96_from_spark.parquet");
2125        let file = File::open(path).unwrap();
2126
2127        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2128            "a",
2129            Timestamp(TimeUnit::Microsecond, None),
2130            true,
2131        )]));
2132        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2133
2134        let mut record_reader =
2135            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2136                .unwrap()
2137                .build()
2138                .unwrap();
2139
2140        let batch = record_reader.next().unwrap().unwrap();
2141        assert_eq!(batch.num_columns(), 1);
2142        let column = batch.column(0);
2143        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2144
2145        let expected = Arc::new(Int64Array::from(vec![
2146            Some(1704141296123456),
2147            Some(1704070800000000),
2148            Some(253402225200000000),
2149            Some(1735599600000000),
2150            None,
2151            Some(9089380393200000000),
2152        ]));
2153
2154        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2155        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2156        // anyway, so this should be a zero-copy non-modifying cast.
2157
2158        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2159        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2160
2161        assert_eq!(casted_timestamps.len(), expected.len());
2162
2163        casted_timestamps
2164            .iter()
2165            .zip(expected.iter())
2166            .for_each(|(lhs, rhs)| {
2167                assert_eq!(lhs, rhs);
2168            });
2169    }
2170
2171    #[test]
2172    fn test_int96_from_spark_file_without_provided_schema() {
2173        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2174        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2175        // values when read as nanosecond resolution overflow and result in garbage values.
2176        use arrow_schema::DataType::Timestamp;
2177        let test_data = arrow::util::test_util::parquet_test_data();
2178        let path = format!("{test_data}/int96_from_spark.parquet");
2179        let file = File::open(path).unwrap();
2180
2181        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2182            .unwrap()
2183            .build()
2184            .unwrap();
2185
2186        let batch = record_reader.next().unwrap().unwrap();
2187        assert_eq!(batch.num_columns(), 1);
2188        let column = batch.column(0);
2189        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2190
2191        let expected = Arc::new(Int64Array::from(vec![
2192            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2193            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2194            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2195            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2196            None,
2197            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2198        ]));
2199
2200        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2201        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2202        // anyway, so this should be a zero-copy non-modifying cast.
2203
2204        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2205        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2206
2207        assert_eq!(casted_timestamps.len(), expected.len());
2208
2209        casted_timestamps
2210            .iter()
2211            .zip(expected.iter())
2212            .for_each(|(lhs, rhs)| {
2213                assert_eq!(lhs, rhs);
2214            });
2215    }
2216
2217    struct RandUtf8Gen {}
2218
2219    impl RandGen<ByteArrayType> for RandUtf8Gen {
2220        fn r#gen(len: i32) -> ByteArray {
2221            Int32Type::r#gen(len).to_string().as_str().into()
2222        }
2223    }
2224
2225    #[test]
2226    fn test_utf8_single_column_reader_test() {
2227        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2228            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2229                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2230            })))
2231        }
2232
2233        let encodings = &[
2234            Encoding::PLAIN,
2235            Encoding::RLE_DICTIONARY,
2236            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2237            Encoding::DELTA_BYTE_ARRAY,
2238        ];
2239
2240        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2241            2,
2242            ConvertedType::NONE,
2243            None,
2244            |vals| {
2245                Arc::new(BinaryArray::from_iter(
2246                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2247                ))
2248            },
2249            encodings,
2250        );
2251
2252        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2253            2,
2254            ConvertedType::UTF8,
2255            None,
2256            string_converter::<i32>,
2257            encodings,
2258        );
2259
2260        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2261            2,
2262            ConvertedType::UTF8,
2263            Some(ArrowDataType::Utf8),
2264            string_converter::<i32>,
2265            encodings,
2266        );
2267
2268        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2269            2,
2270            ConvertedType::UTF8,
2271            Some(ArrowDataType::LargeUtf8),
2272            string_converter::<i64>,
2273            encodings,
2274        );
2275
2276        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2277        for key in &small_key_types {
2278            for encoding in encodings {
2279                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2280                opts.encoding = *encoding;
2281
2282                let data_type =
2283                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2284
2285                // Cannot run full test suite as keys overflow, run small test instead
2286                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2287                    opts,
2288                    2,
2289                    ConvertedType::UTF8,
2290                    Some(data_type.clone()),
2291                    move |vals| {
2292                        let vals = string_converter::<i32>(vals);
2293                        arrow::compute::cast(&vals, &data_type).unwrap()
2294                    },
2295                );
2296            }
2297        }
2298
2299        let key_types = [
2300            ArrowDataType::Int16,
2301            ArrowDataType::UInt16,
2302            ArrowDataType::Int32,
2303            ArrowDataType::UInt32,
2304            ArrowDataType::Int64,
2305            ArrowDataType::UInt64,
2306        ];
2307
2308        for key in &key_types {
2309            let data_type =
2310                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2311
2312            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2313                2,
2314                ConvertedType::UTF8,
2315                Some(data_type.clone()),
2316                move |vals| {
2317                    let vals = string_converter::<i32>(vals);
2318                    arrow::compute::cast(&vals, &data_type).unwrap()
2319                },
2320                encodings,
2321            );
2322
2323            // https://github.com/apache/arrow-rs/issues/1179
2324            // let data_type = ArrowDataType::Dictionary(
2325            //     Box::new(key.clone()),
2326            //     Box::new(ArrowDataType::LargeUtf8),
2327            // );
2328            //
2329            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2330            //     2,
2331            //     ConvertedType::UTF8,
2332            //     Some(data_type.clone()),
2333            //     move |vals| {
2334            //         let vals = string_converter::<i64>(vals);
2335            //         arrow::compute::cast(&vals, &data_type).unwrap()
2336            //     },
2337            //     encodings,
2338            // );
2339        }
2340    }
2341
2342    #[test]
2343    fn test_decimal_nullable_struct() {
2344        let decimals = Decimal256Array::from_iter_values(
2345            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2346        );
2347
2348        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2349            "decimals",
2350            decimals.data_type().clone(),
2351            false,
2352        )])))
2353        .len(8)
2354        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2355        .child_data(vec![decimals.into_data()])
2356        .build()
2357        .unwrap();
2358
2359        let written =
2360            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2361                .unwrap();
2362
2363        let mut buffer = Vec::with_capacity(1024);
2364        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2365        writer.write(&written).unwrap();
2366        writer.close().unwrap();
2367
2368        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2369            .unwrap()
2370            .collect::<Result<Vec<_>, _>>()
2371            .unwrap();
2372
2373        assert_eq!(&written.slice(0, 3), &read[0]);
2374        assert_eq!(&written.slice(3, 3), &read[1]);
2375        assert_eq!(&written.slice(6, 2), &read[2]);
2376    }
2377
2378    #[test]
2379    fn test_int32_nullable_struct() {
2380        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2381        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2382            "int32",
2383            int32.data_type().clone(),
2384            false,
2385        )])))
2386        .len(8)
2387        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2388        .child_data(vec![int32.into_data()])
2389        .build()
2390        .unwrap();
2391
2392        let written =
2393            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2394                .unwrap();
2395
2396        let mut buffer = Vec::with_capacity(1024);
2397        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2398        writer.write(&written).unwrap();
2399        writer.close().unwrap();
2400
2401        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2402            .unwrap()
2403            .collect::<Result<Vec<_>, _>>()
2404            .unwrap();
2405
2406        assert_eq!(&written.slice(0, 3), &read[0]);
2407        assert_eq!(&written.slice(3, 3), &read[1]);
2408        assert_eq!(&written.slice(6, 2), &read[2]);
2409    }
2410
2411    #[test]
2412    fn test_decimal_list() {
2413        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2414
2415        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2416        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2417            decimals.data_type().clone(),
2418            false,
2419        ))))
2420        .len(7)
2421        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2422        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2423        .child_data(vec![decimals.into_data()])
2424        .build()
2425        .unwrap();
2426
2427        let written =
2428            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2429                .unwrap();
2430
2431        let mut buffer = Vec::with_capacity(1024);
2432        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2433        writer.write(&written).unwrap();
2434        writer.close().unwrap();
2435
2436        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2437            .unwrap()
2438            .collect::<Result<Vec<_>, _>>()
2439            .unwrap();
2440
2441        assert_eq!(&written.slice(0, 3), &read[0]);
2442        assert_eq!(&written.slice(3, 3), &read[1]);
2443        assert_eq!(&written.slice(6, 1), &read[2]);
2444    }
2445
2446    #[test]
2447    fn test_read_decimal_file() {
2448        use arrow_array::Decimal128Array;
2449        let testdata = arrow::util::test_util::parquet_test_data();
2450        let file_variants = vec![
2451            ("byte_array", 4),
2452            ("fixed_length", 25),
2453            ("int32", 4),
2454            ("int64", 10),
2455        ];
2456        for (prefix, target_precision) in file_variants {
2457            let path = format!("{testdata}/{prefix}_decimal.parquet");
2458            let file = File::open(path).unwrap();
2459            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2460
2461            let batch = record_reader.next().unwrap().unwrap();
2462            assert_eq!(batch.num_rows(), 24);
2463            let col = batch
2464                .column(0)
2465                .as_any()
2466                .downcast_ref::<Decimal128Array>()
2467                .unwrap();
2468
2469            let expected = 1..25;
2470
2471            assert_eq!(col.precision(), target_precision);
2472            assert_eq!(col.scale(), 2);
2473
2474            for (i, v) in expected.enumerate() {
2475                assert_eq!(col.value(i), v * 100_i128);
2476            }
2477        }
2478    }
2479
2480    #[test]
2481    fn test_read_float16_nonzeros_file() {
2482        use arrow_array::Float16Array;
2483        let testdata = arrow::util::test_util::parquet_test_data();
2484        // see https://github.com/apache/parquet-testing/pull/40
2485        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2486        let file = File::open(path).unwrap();
2487        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2488
2489        let batch = record_reader.next().unwrap().unwrap();
2490        assert_eq!(batch.num_rows(), 8);
2491        let col = batch
2492            .column(0)
2493            .as_any()
2494            .downcast_ref::<Float16Array>()
2495            .unwrap();
2496
2497        let f16_two = f16::ONE + f16::ONE;
2498
2499        assert_eq!(col.null_count(), 1);
2500        assert!(col.is_null(0));
2501        assert_eq!(col.value(1), f16::ONE);
2502        assert_eq!(col.value(2), -f16_two);
2503        assert!(col.value(3).is_nan());
2504        assert_eq!(col.value(4), f16::ZERO);
2505        assert!(col.value(4).is_sign_positive());
2506        assert_eq!(col.value(5), f16::NEG_ONE);
2507        assert_eq!(col.value(6), f16::NEG_ZERO);
2508        assert!(col.value(6).is_sign_negative());
2509        assert_eq!(col.value(7), f16_two);
2510    }
2511
2512    #[test]
2513    fn test_read_float16_zeros_file() {
2514        use arrow_array::Float16Array;
2515        let testdata = arrow::util::test_util::parquet_test_data();
2516        // see https://github.com/apache/parquet-testing/pull/40
2517        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2518        let file = File::open(path).unwrap();
2519        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2520
2521        let batch = record_reader.next().unwrap().unwrap();
2522        assert_eq!(batch.num_rows(), 3);
2523        let col = batch
2524            .column(0)
2525            .as_any()
2526            .downcast_ref::<Float16Array>()
2527            .unwrap();
2528
2529        assert_eq!(col.null_count(), 1);
2530        assert!(col.is_null(0));
2531        assert_eq!(col.value(1), f16::ZERO);
2532        assert!(col.value(1).is_sign_positive());
2533        assert!(col.value(2).is_nan());
2534    }
2535
2536    #[test]
2537    fn test_read_float32_float64_byte_stream_split() {
2538        let path = format!(
2539            "{}/byte_stream_split.zstd.parquet",
2540            arrow::util::test_util::parquet_test_data(),
2541        );
2542        let file = File::open(path).unwrap();
2543        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2544
2545        let mut row_count = 0;
2546        for batch in record_reader {
2547            let batch = batch.unwrap();
2548            row_count += batch.num_rows();
2549            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2550            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2551
2552            // This file contains floats from a standard normal distribution
2553            for &x in f32_col.values() {
2554                assert!(x > -10.0);
2555                assert!(x < 10.0);
2556            }
2557            for &x in f64_col.values() {
2558                assert!(x > -10.0);
2559                assert!(x < 10.0);
2560            }
2561        }
2562        assert_eq!(row_count, 300);
2563    }
2564
2565    #[test]
2566    fn test_read_extended_byte_stream_split() {
2567        let path = format!(
2568            "{}/byte_stream_split_extended.gzip.parquet",
2569            arrow::util::test_util::parquet_test_data(),
2570        );
2571        let file = File::open(path).unwrap();
2572        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2573
2574        let mut row_count = 0;
2575        for batch in record_reader {
2576            let batch = batch.unwrap();
2577            row_count += batch.num_rows();
2578
2579            // 0,1 are f16
2580            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2581            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2582            assert_eq!(f16_col.len(), f16_bss.len());
2583            f16_col
2584                .iter()
2585                .zip(f16_bss.iter())
2586                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2587
2588            // 2,3 are f32
2589            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2590            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2591            assert_eq!(f32_col.len(), f32_bss.len());
2592            f32_col
2593                .iter()
2594                .zip(f32_bss.iter())
2595                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2596
2597            // 4,5 are f64
2598            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2599            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2600            assert_eq!(f64_col.len(), f64_bss.len());
2601            f64_col
2602                .iter()
2603                .zip(f64_bss.iter())
2604                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2605
2606            // 6,7 are i32
2607            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2608            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2609            assert_eq!(i32_col.len(), i32_bss.len());
2610            i32_col
2611                .iter()
2612                .zip(i32_bss.iter())
2613                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2614
2615            // 8,9 are i64
2616            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2617            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2618            assert_eq!(i64_col.len(), i64_bss.len());
2619            i64_col
2620                .iter()
2621                .zip(i64_bss.iter())
2622                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2623
2624            // 10,11 are FLBA(5)
2625            let flba_col = batch.column(10).as_fixed_size_binary();
2626            let flba_bss = batch.column(11).as_fixed_size_binary();
2627            assert_eq!(flba_col.len(), flba_bss.len());
2628            flba_col
2629                .iter()
2630                .zip(flba_bss.iter())
2631                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2632
2633            // 12,13 are FLBA(4) (decimal(7,3))
2634            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2635            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2636            assert_eq!(dec_col.len(), dec_bss.len());
2637            dec_col
2638                .iter()
2639                .zip(dec_bss.iter())
2640                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2641        }
2642        assert_eq!(row_count, 200);
2643    }
2644
2645    #[test]
2646    fn test_read_incorrect_map_schema_file() {
2647        let testdata = arrow::util::test_util::parquet_test_data();
2648        // see https://github.com/apache/parquet-testing/pull/47
2649        let path = format!("{testdata}/incorrect_map_schema.parquet");
2650        let file = File::open(path).unwrap();
2651        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2652
2653        let batch = record_reader.next().unwrap().unwrap();
2654        assert_eq!(batch.num_rows(), 1);
2655
2656        let expected_schema = Schema::new(vec![Field::new(
2657            "my_map",
2658            ArrowDataType::Map(
2659                Arc::new(Field::new(
2660                    "key_value",
2661                    ArrowDataType::Struct(Fields::from(vec![
2662                        Field::new("key", ArrowDataType::Utf8, false),
2663                        Field::new("value", ArrowDataType::Utf8, true),
2664                    ])),
2665                    false,
2666                )),
2667                false,
2668            ),
2669            true,
2670        )]);
2671        assert_eq!(batch.schema().as_ref(), &expected_schema);
2672
2673        assert_eq!(batch.num_rows(), 1);
2674        assert_eq!(batch.column(0).null_count(), 0);
2675        assert_eq!(
2676            batch.column(0).as_map().keys().as_ref(),
2677            &StringArray::from(vec!["parent", "name"])
2678        );
2679        assert_eq!(
2680            batch.column(0).as_map().values().as_ref(),
2681            &StringArray::from(vec!["another", "report"])
2682        );
2683    }
2684
2685    #[test]
2686    fn test_read_dict_fixed_size_binary() {
2687        let schema = Arc::new(Schema::new(vec![Field::new(
2688            "a",
2689            ArrowDataType::Dictionary(
2690                Box::new(ArrowDataType::UInt8),
2691                Box::new(ArrowDataType::FixedSizeBinary(8)),
2692            ),
2693            true,
2694        )]));
2695        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2696        let values = FixedSizeBinaryArray::try_from_iter(
2697            vec![
2698                (0u8..8u8).collect::<Vec<u8>>(),
2699                (24u8..32u8).collect::<Vec<u8>>(),
2700            ]
2701            .into_iter(),
2702        )
2703        .unwrap();
2704        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2705        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2706
2707        let mut buffer = Vec::with_capacity(1024);
2708        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2709        writer.write(&batch).unwrap();
2710        writer.close().unwrap();
2711        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2712            .unwrap()
2713            .collect::<Result<Vec<_>, _>>()
2714            .unwrap();
2715
2716        assert_eq!(read.len(), 1);
2717        assert_eq!(&batch, &read[0])
2718    }
2719
2720    #[test]
2721    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2722        // the `StructArrayReader` will check the definition and repetition levels of the first
2723        // child column in the struct to determine nullability for the struct. If the first
2724        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2725        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2726        // buffers managed by this array reader.
2727
2728        let struct_fields = Fields::from(vec![
2729            Field::new(
2730                "city",
2731                ArrowDataType::Dictionary(
2732                    Box::new(ArrowDataType::UInt8),
2733                    Box::new(ArrowDataType::Utf8),
2734                ),
2735                true,
2736            ),
2737            Field::new("name", ArrowDataType::Utf8, true),
2738        ]);
2739        let schema = Arc::new(Schema::new(vec![Field::new(
2740            "items",
2741            ArrowDataType::Struct(struct_fields.clone()),
2742            true,
2743        )]));
2744
2745        let items_arr = StructArray::new(
2746            struct_fields,
2747            vec![
2748                Arc::new(DictionaryArray::new(
2749                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2750                    Arc::new(StringArray::from_iter_values(vec![
2751                        "quebec",
2752                        "fredericton",
2753                        "halifax",
2754                    ])),
2755                )),
2756                Arc::new(StringArray::from_iter_values(vec![
2757                    "albert", "terry", "lance", "", "tim",
2758                ])),
2759            ],
2760            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2761        );
2762
2763        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2764        let mut buffer = Vec::with_capacity(1024);
2765        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2766        writer.write(&batch).unwrap();
2767        writer.close().unwrap();
2768        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2769            .unwrap()
2770            .collect::<Result<Vec<_>, _>>()
2771            .unwrap();
2772
2773        assert_eq!(read.len(), 1);
2774        assert_eq!(&batch, &read[0])
2775    }
2776
2777    /// Parameters for single_column_reader_test
2778    #[derive(Clone)]
2779    struct TestOptions {
2780        /// Number of row group to write to parquet (row group size =
2781        /// num_row_groups / num_rows)
2782        num_row_groups: usize,
2783        /// Total number of rows per row group
2784        num_rows: usize,
2785        /// Size of batches to read back
2786        record_batch_size: usize,
2787        /// Percentage of nulls in column or None if required
2788        null_percent: Option<usize>,
2789        /// Set write batch size
2790        ///
2791        /// This is the number of rows that are written at once to a page and
2792        /// therefore acts as a bound on the page granularity of a row group
2793        write_batch_size: usize,
2794        /// Maximum size of page in bytes
2795        max_data_page_size: usize,
2796        /// Maximum size of dictionary page in bytes
2797        max_dict_page_size: usize,
2798        /// Writer version
2799        writer_version: WriterVersion,
2800        /// Enabled statistics
2801        enabled_statistics: EnabledStatistics,
2802        /// Encoding
2803        encoding: Encoding,
2804        /// row selections and total selected row count
2805        row_selections: Option<(RowSelection, usize)>,
2806        /// row filter
2807        row_filter: Option<Vec<bool>>,
2808        /// limit
2809        limit: Option<usize>,
2810        /// offset
2811        offset: Option<usize>,
2812    }
2813
2814    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2815    impl std::fmt::Debug for TestOptions {
2816        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2817            f.debug_struct("TestOptions")
2818                .field("num_row_groups", &self.num_row_groups)
2819                .field("num_rows", &self.num_rows)
2820                .field("record_batch_size", &self.record_batch_size)
2821                .field("null_percent", &self.null_percent)
2822                .field("write_batch_size", &self.write_batch_size)
2823                .field("max_data_page_size", &self.max_data_page_size)
2824                .field("max_dict_page_size", &self.max_dict_page_size)
2825                .field("writer_version", &self.writer_version)
2826                .field("enabled_statistics", &self.enabled_statistics)
2827                .field("encoding", &self.encoding)
2828                .field("row_selections", &self.row_selections.is_some())
2829                .field("row_filter", &self.row_filter.is_some())
2830                .field("limit", &self.limit)
2831                .field("offset", &self.offset)
2832                .finish()
2833        }
2834    }
2835
2836    impl Default for TestOptions {
2837        fn default() -> Self {
2838            Self {
2839                num_row_groups: 2,
2840                num_rows: 100,
2841                record_batch_size: 15,
2842                null_percent: None,
2843                write_batch_size: 64,
2844                max_data_page_size: 1024 * 1024,
2845                max_dict_page_size: 1024 * 1024,
2846                writer_version: WriterVersion::PARQUET_1_0,
2847                enabled_statistics: EnabledStatistics::Page,
2848                encoding: Encoding::PLAIN,
2849                row_selections: None,
2850                row_filter: None,
2851                limit: None,
2852                offset: None,
2853            }
2854        }
2855    }
2856
2857    impl TestOptions {
2858        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2859            Self {
2860                num_row_groups,
2861                num_rows,
2862                record_batch_size,
2863                ..Default::default()
2864            }
2865        }
2866
2867        fn with_null_percent(self, null_percent: usize) -> Self {
2868            Self {
2869                null_percent: Some(null_percent),
2870                ..self
2871            }
2872        }
2873
2874        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2875            Self {
2876                max_data_page_size,
2877                ..self
2878            }
2879        }
2880
2881        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2882            Self {
2883                max_dict_page_size,
2884                ..self
2885            }
2886        }
2887
2888        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2889            Self {
2890                enabled_statistics,
2891                ..self
2892            }
2893        }
2894
2895        fn with_row_selections(self) -> Self {
2896            assert!(self.row_filter.is_none(), "Must set row selection first");
2897
2898            let mut rng = rng();
2899            let step = rng.random_range(self.record_batch_size..self.num_rows);
2900            let row_selections = create_test_selection(
2901                step,
2902                self.num_row_groups * self.num_rows,
2903                rng.random::<bool>(),
2904            );
2905            Self {
2906                row_selections: Some(row_selections),
2907                ..self
2908            }
2909        }
2910
2911        fn with_row_filter(self) -> Self {
2912            let row_count = match &self.row_selections {
2913                Some((_, count)) => *count,
2914                None => self.num_row_groups * self.num_rows,
2915            };
2916
2917            let mut rng = rng();
2918            Self {
2919                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2920                ..self
2921            }
2922        }
2923
2924        fn with_limit(self, limit: usize) -> Self {
2925            Self {
2926                limit: Some(limit),
2927                ..self
2928            }
2929        }
2930
2931        fn with_offset(self, offset: usize) -> Self {
2932            Self {
2933                offset: Some(offset),
2934                ..self
2935            }
2936        }
2937
2938        fn writer_props(&self) -> WriterProperties {
2939            let builder = WriterProperties::builder()
2940                .set_data_page_size_limit(self.max_data_page_size)
2941                .set_write_batch_size(self.write_batch_size)
2942                .set_writer_version(self.writer_version)
2943                .set_statistics_enabled(self.enabled_statistics);
2944
2945            let builder = match self.encoding {
2946                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2947                    .set_dictionary_enabled(true)
2948                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2949                _ => builder
2950                    .set_dictionary_enabled(false)
2951                    .set_encoding(self.encoding),
2952            };
2953
2954            builder.build()
2955        }
2956    }
2957
2958    /// Create a parquet file and then read it using
2959    /// `ParquetFileArrowReader` using a standard set of parameters
2960    /// `opts`.
2961    ///
2962    /// `rand_max` represents the maximum size of value to pass to to
2963    /// value generator
2964    fn run_single_column_reader_tests<T, F, G>(
2965        rand_max: i32,
2966        converted_type: ConvertedType,
2967        arrow_type: Option<ArrowDataType>,
2968        converter: F,
2969        encodings: &[Encoding],
2970    ) where
2971        T: DataType,
2972        G: RandGen<T>,
2973        F: Fn(&[Option<T::T>]) -> ArrayRef,
2974    {
2975        let all_options = vec![
2976            // choose record_batch_batch (15) so batches cross row
2977            // group boundaries (50 rows in 2 row groups) cases.
2978            TestOptions::new(2, 100, 15),
2979            // choose record_batch_batch (5) so batches sometime fall
2980            // on row group boundaries and (25 rows in 3 row groups
2981            // --> row groups of 10, 10, and 5). Tests buffer
2982            // refilling edge cases.
2983            TestOptions::new(3, 25, 5),
2984            // Choose record_batch_size (25) so all batches fall
2985            // exactly on row group boundary (25). Tests buffer
2986            // refilling edge cases.
2987            TestOptions::new(4, 100, 25),
2988            // Set maximum page size so row groups have multiple pages
2989            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2990            // Set small dictionary page size to test dictionary fallback
2991            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2992            // Test optional but with no nulls
2993            TestOptions::new(2, 256, 127).with_null_percent(0),
2994            // Test optional with nulls
2995            TestOptions::new(2, 256, 93).with_null_percent(25),
2996            // Test with limit of 0
2997            TestOptions::new(4, 100, 25).with_limit(0),
2998            // Test with limit of 50
2999            TestOptions::new(4, 100, 25).with_limit(50),
3000            // Test with limit equal to number of rows
3001            TestOptions::new(4, 100, 25).with_limit(10),
3002            // Test with limit larger than number of rows
3003            TestOptions::new(4, 100, 25).with_limit(101),
3004            // Test with limit + offset equal to number of rows
3005            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3006            // Test with limit + offset equal to number of rows
3007            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3008            // Test with limit + offset larger than number of rows
3009            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3010            // Test with no page-level statistics
3011            TestOptions::new(2, 256, 91)
3012                .with_null_percent(25)
3013                .with_enabled_statistics(EnabledStatistics::Chunk),
3014            // Test with no statistics
3015            TestOptions::new(2, 256, 91)
3016                .with_null_percent(25)
3017                .with_enabled_statistics(EnabledStatistics::None),
3018            // Test with all null
3019            TestOptions::new(2, 128, 91)
3020                .with_null_percent(100)
3021                .with_enabled_statistics(EnabledStatistics::None),
3022            // Test skip
3023
3024            // choose record_batch_batch (15) so batches cross row
3025            // group boundaries (50 rows in 2 row groups) cases.
3026            TestOptions::new(2, 100, 15).with_row_selections(),
3027            // choose record_batch_batch (5) so batches sometime fall
3028            // on row group boundaries and (25 rows in 3 row groups
3029            // --> row groups of 10, 10, and 5). Tests buffer
3030            // refilling edge cases.
3031            TestOptions::new(3, 25, 5).with_row_selections(),
3032            // Choose record_batch_size (25) so all batches fall
3033            // exactly on row group boundary (25). Tests buffer
3034            // refilling edge cases.
3035            TestOptions::new(4, 100, 25).with_row_selections(),
3036            // Set maximum page size so row groups have multiple pages
3037            TestOptions::new(3, 256, 73)
3038                .with_max_data_page_size(128)
3039                .with_row_selections(),
3040            // Set small dictionary page size to test dictionary fallback
3041            TestOptions::new(3, 256, 57)
3042                .with_max_dict_page_size(128)
3043                .with_row_selections(),
3044            // Test optional but with no nulls
3045            TestOptions::new(2, 256, 127)
3046                .with_null_percent(0)
3047                .with_row_selections(),
3048            // Test optional with nulls
3049            TestOptions::new(2, 256, 93)
3050                .with_null_percent(25)
3051                .with_row_selections(),
3052            // Test optional with nulls
3053            TestOptions::new(2, 256, 93)
3054                .with_null_percent(25)
3055                .with_row_selections()
3056                .with_limit(10),
3057            // Test optional with nulls
3058            TestOptions::new(2, 256, 93)
3059                .with_null_percent(25)
3060                .with_row_selections()
3061                .with_offset(20)
3062                .with_limit(10),
3063            // Test filter
3064
3065            // Test with row filter
3066            TestOptions::new(4, 100, 25).with_row_filter(),
3067            // Test with row selection and row filter
3068            TestOptions::new(4, 100, 25)
3069                .with_row_selections()
3070                .with_row_filter(),
3071            // Test with nulls and row filter
3072            TestOptions::new(2, 256, 93)
3073                .with_null_percent(25)
3074                .with_max_data_page_size(10)
3075                .with_row_filter(),
3076            // Test with nulls and row filter and small pages
3077            TestOptions::new(2, 256, 93)
3078                .with_null_percent(25)
3079                .with_max_data_page_size(10)
3080                .with_row_selections()
3081                .with_row_filter(),
3082            // Test with row selection and no offset index and small pages
3083            TestOptions::new(2, 256, 93)
3084                .with_enabled_statistics(EnabledStatistics::None)
3085                .with_max_data_page_size(10)
3086                .with_row_selections(),
3087        ];
3088
3089        all_options.into_iter().for_each(|opts| {
3090            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3091                for encoding in encodings {
3092                    let opts = TestOptions {
3093                        writer_version,
3094                        encoding: *encoding,
3095                        ..opts.clone()
3096                    };
3097
3098                    single_column_reader_test::<T, _, G>(
3099                        opts,
3100                        rand_max,
3101                        converted_type,
3102                        arrow_type.clone(),
3103                        &converter,
3104                    )
3105                }
3106            }
3107        });
3108    }
3109
3110    /// Create a parquet file and then read it using
3111    /// `ParquetFileArrowReader` using the parameters described in
3112    /// `opts`.
3113    fn single_column_reader_test<T, F, G>(
3114        opts: TestOptions,
3115        rand_max: i32,
3116        converted_type: ConvertedType,
3117        arrow_type: Option<ArrowDataType>,
3118        converter: F,
3119    ) where
3120        T: DataType,
3121        G: RandGen<T>,
3122        F: Fn(&[Option<T::T>]) -> ArrayRef,
3123    {
3124        // Print out options to facilitate debugging failures on CI
3125        println!(
3126            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3127            T::get_physical_type(),
3128            converted_type,
3129            arrow_type,
3130            opts
3131        );
3132
3133        //according to null_percent generate def_levels
3134        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3135            Some(null_percent) => {
3136                let mut rng = rng();
3137
3138                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3139                    .map(|_| {
3140                        std::iter::from_fn(|| {
3141                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3142                        })
3143                        .take(opts.num_rows)
3144                        .collect()
3145                    })
3146                    .collect();
3147                (Repetition::OPTIONAL, Some(def_levels))
3148            }
3149            None => (Repetition::REQUIRED, None),
3150        };
3151
3152        //generate random table data
3153        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3154            .map(|idx| {
3155                let null_count = match def_levels.as_ref() {
3156                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3157                    None => 0,
3158                };
3159                G::gen_vec(rand_max, opts.num_rows - null_count)
3160            })
3161            .collect();
3162
3163        let len = match T::get_physical_type() {
3164            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3165            crate::basic::Type::INT96 => 12,
3166            _ => -1,
3167        };
3168
3169        let fields = vec![Arc::new(
3170            Type::primitive_type_builder("leaf", T::get_physical_type())
3171                .with_repetition(repetition)
3172                .with_converted_type(converted_type)
3173                .with_length(len)
3174                .build()
3175                .unwrap(),
3176        )];
3177
3178        let schema = Arc::new(
3179            Type::group_type_builder("test_schema")
3180                .with_fields(fields)
3181                .build()
3182                .unwrap(),
3183        );
3184
3185        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3186
3187        let mut file = tempfile::tempfile().unwrap();
3188
3189        generate_single_column_file_with_data::<T>(
3190            &values,
3191            def_levels.as_ref(),
3192            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3193            schema,
3194            arrow_field,
3195            &opts,
3196        )
3197        .unwrap();
3198
3199        file.rewind().unwrap();
3200
3201        let options = ArrowReaderOptions::new()
3202            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
3203
3204        let mut builder =
3205            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3206
3207        let expected_data = match opts.row_selections {
3208            Some((selections, row_count)) => {
3209                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3210
3211                let mut skip_data: Vec<Option<T::T>> = vec![];
3212                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3213                for select in dequeue {
3214                    if select.skip {
3215                        without_skip_data.drain(0..select.row_count);
3216                    } else {
3217                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3218                    }
3219                }
3220                builder = builder.with_row_selection(selections);
3221
3222                assert_eq!(skip_data.len(), row_count);
3223                skip_data
3224            }
3225            None => {
3226                //get flatten table data
3227                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3228                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3229                expected_data
3230            }
3231        };
3232
3233        let mut expected_data = match opts.row_filter {
3234            Some(filter) => {
3235                let expected_data = expected_data
3236                    .into_iter()
3237                    .zip(filter.iter())
3238                    .filter_map(|(d, f)| f.then(|| d))
3239                    .collect();
3240
3241                let mut filter_offset = 0;
3242                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3243                    ProjectionMask::all(),
3244                    move |b| {
3245                        let array = BooleanArray::from_iter(
3246                            filter
3247                                .iter()
3248                                .skip(filter_offset)
3249                                .take(b.num_rows())
3250                                .map(|x| Some(*x)),
3251                        );
3252                        filter_offset += b.num_rows();
3253                        Ok(array)
3254                    },
3255                ))]);
3256
3257                builder = builder.with_row_filter(filter);
3258                expected_data
3259            }
3260            None => expected_data,
3261        };
3262
3263        if let Some(offset) = opts.offset {
3264            builder = builder.with_offset(offset);
3265            expected_data = expected_data.into_iter().skip(offset).collect();
3266        }
3267
3268        if let Some(limit) = opts.limit {
3269            builder = builder.with_limit(limit);
3270            expected_data = expected_data.into_iter().take(limit).collect();
3271        }
3272
3273        let mut record_reader = builder
3274            .with_batch_size(opts.record_batch_size)
3275            .build()
3276            .unwrap();
3277
3278        let mut total_read = 0;
3279        loop {
3280            let maybe_batch = record_reader.next();
3281            if total_read < expected_data.len() {
3282                let end = min(total_read + opts.record_batch_size, expected_data.len());
3283                let batch = maybe_batch.unwrap().unwrap();
3284                assert_eq!(end - total_read, batch.num_rows());
3285
3286                let a = converter(&expected_data[total_read..end]);
3287                let b = batch.column(0);
3288
3289                assert_eq!(a.data_type(), b.data_type());
3290                assert_eq!(a.to_data(), b.to_data());
3291                assert_eq!(
3292                    a.as_any().type_id(),
3293                    b.as_any().type_id(),
3294                    "incorrect type ids"
3295                );
3296
3297                total_read = end;
3298            } else {
3299                assert!(maybe_batch.is_none());
3300                break;
3301            }
3302        }
3303    }
3304
3305    fn gen_expected_data<T: DataType>(
3306        def_levels: Option<&Vec<Vec<i16>>>,
3307        values: &[Vec<T::T>],
3308    ) -> Vec<Option<T::T>> {
3309        let data: Vec<Option<T::T>> = match def_levels {
3310            Some(levels) => {
3311                let mut values_iter = values.iter().flatten();
3312                levels
3313                    .iter()
3314                    .flatten()
3315                    .map(|d| match d {
3316                        1 => Some(values_iter.next().cloned().unwrap()),
3317                        0 => None,
3318                        _ => unreachable!(),
3319                    })
3320                    .collect()
3321            }
3322            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3323        };
3324        data
3325    }
3326
3327    fn generate_single_column_file_with_data<T: DataType>(
3328        values: &[Vec<T::T>],
3329        def_levels: Option<&Vec<Vec<i16>>>,
3330        file: File,
3331        schema: TypePtr,
3332        field: Option<Field>,
3333        opts: &TestOptions,
3334    ) -> Result<ParquetMetaData> {
3335        let mut writer_props = opts.writer_props();
3336        if let Some(field) = field {
3337            let arrow_schema = Schema::new(vec![field]);
3338            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3339        }
3340
3341        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3342
3343        for (idx, v) in values.iter().enumerate() {
3344            let def_levels = def_levels.map(|d| d[idx].as_slice());
3345            let mut row_group_writer = writer.next_row_group()?;
3346            {
3347                let mut column_writer = row_group_writer
3348                    .next_column()?
3349                    .expect("Column writer is none!");
3350
3351                column_writer
3352                    .typed::<T>()
3353                    .write_batch(v, def_levels, None)?;
3354
3355                column_writer.close()?;
3356            }
3357            row_group_writer.close()?;
3358        }
3359
3360        writer.close()
3361    }
3362
3363    fn get_test_file(file_name: &str) -> File {
3364        let mut path = PathBuf::new();
3365        path.push(arrow::util::test_util::arrow_test_data());
3366        path.push(file_name);
3367
3368        File::open(path.as_path()).expect("File not found!")
3369    }
3370
3371    #[test]
3372    fn test_read_structs() {
3373        // This particular test file has columns of struct types where there is
3374        // a column that has the same name as one of the struct fields
3375        // (see: ARROW-11452)
3376        let testdata = arrow::util::test_util::parquet_test_data();
3377        let path = format!("{testdata}/nested_structs.rust.parquet");
3378        let file = File::open(&path).unwrap();
3379        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3380
3381        for batch in record_batch_reader {
3382            batch.unwrap();
3383        }
3384
3385        let file = File::open(&path).unwrap();
3386        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3387
3388        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3389        let projected_reader = builder
3390            .with_projection(mask)
3391            .with_batch_size(60)
3392            .build()
3393            .unwrap();
3394
3395        let expected_schema = Schema::new(vec![
3396            Field::new(
3397                "roll_num",
3398                ArrowDataType::Struct(Fields::from(vec![Field::new(
3399                    "count",
3400                    ArrowDataType::UInt64,
3401                    false,
3402                )])),
3403                false,
3404            ),
3405            Field::new(
3406                "PC_CUR",
3407                ArrowDataType::Struct(Fields::from(vec![
3408                    Field::new("mean", ArrowDataType::Int64, false),
3409                    Field::new("sum", ArrowDataType::Int64, false),
3410                ])),
3411                false,
3412            ),
3413        ]);
3414
3415        // Tests for #1652 and #1654
3416        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3417
3418        for batch in projected_reader {
3419            let batch = batch.unwrap();
3420            assert_eq!(batch.schema().as_ref(), &expected_schema);
3421        }
3422    }
3423
3424    #[test]
3425    // same as test_read_structs but constructs projection mask via column names
3426    fn test_read_structs_by_name() {
3427        let testdata = arrow::util::test_util::parquet_test_data();
3428        let path = format!("{testdata}/nested_structs.rust.parquet");
3429        let file = File::open(&path).unwrap();
3430        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3431
3432        for batch in record_batch_reader {
3433            batch.unwrap();
3434        }
3435
3436        let file = File::open(&path).unwrap();
3437        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3438
3439        let mask = ProjectionMask::columns(
3440            builder.parquet_schema(),
3441            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3442        );
3443        let projected_reader = builder
3444            .with_projection(mask)
3445            .with_batch_size(60)
3446            .build()
3447            .unwrap();
3448
3449        let expected_schema = Schema::new(vec![
3450            Field::new(
3451                "roll_num",
3452                ArrowDataType::Struct(Fields::from(vec![Field::new(
3453                    "count",
3454                    ArrowDataType::UInt64,
3455                    false,
3456                )])),
3457                false,
3458            ),
3459            Field::new(
3460                "PC_CUR",
3461                ArrowDataType::Struct(Fields::from(vec![
3462                    Field::new("mean", ArrowDataType::Int64, false),
3463                    Field::new("sum", ArrowDataType::Int64, false),
3464                ])),
3465                false,
3466            ),
3467        ]);
3468
3469        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3470
3471        for batch in projected_reader {
3472            let batch = batch.unwrap();
3473            assert_eq!(batch.schema().as_ref(), &expected_schema);
3474        }
3475    }
3476
3477    #[test]
3478    fn test_read_maps() {
3479        let testdata = arrow::util::test_util::parquet_test_data();
3480        let path = format!("{testdata}/nested_maps.snappy.parquet");
3481        let file = File::open(path).unwrap();
3482        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3483
3484        for batch in record_batch_reader {
3485            batch.unwrap();
3486        }
3487    }
3488
3489    #[test]
3490    fn test_nested_nullability() {
3491        let message_type = "message nested {
3492          OPTIONAL Group group {
3493            REQUIRED INT32 leaf;
3494          }
3495        }";
3496
3497        let file = tempfile::tempfile().unwrap();
3498        let schema = Arc::new(parse_message_type(message_type).unwrap());
3499
3500        {
3501            // Write using low-level parquet API (#1167)
3502            let mut writer =
3503                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3504                    .unwrap();
3505
3506            {
3507                let mut row_group_writer = writer.next_row_group().unwrap();
3508                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3509
3510                column_writer
3511                    .typed::<Int32Type>()
3512                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3513                    .unwrap();
3514
3515                column_writer.close().unwrap();
3516                row_group_writer.close().unwrap();
3517            }
3518
3519            writer.close().unwrap();
3520        }
3521
3522        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3523        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3524
3525        let reader = builder.with_projection(mask).build().unwrap();
3526
3527        let expected_schema = Schema::new(vec![Field::new(
3528            "group",
3529            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3530            true,
3531        )]);
3532
3533        let batch = reader.into_iter().next().unwrap().unwrap();
3534        assert_eq!(batch.schema().as_ref(), &expected_schema);
3535        assert_eq!(batch.num_rows(), 4);
3536        assert_eq!(batch.column(0).null_count(), 2);
3537    }
3538
3539    #[test]
3540    fn test_invalid_utf8() {
3541        // a parquet file with 1 column with invalid utf8
3542        let data = vec![
3543            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3544            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3545            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3546            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3547            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3548            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3549            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3550            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3551            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3552            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3553            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3554            82, 49,
3555        ];
3556
3557        let file = Bytes::from(data);
3558        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3559
3560        let error = record_batch_reader.next().unwrap().unwrap_err();
3561
3562        assert!(
3563            error.to_string().contains("invalid utf-8 sequence"),
3564            "{}",
3565            error
3566        );
3567    }
3568
3569    #[test]
3570    fn test_invalid_utf8_string_array() {
3571        test_invalid_utf8_string_array_inner::<i32>();
3572    }
3573
3574    #[test]
3575    fn test_invalid_utf8_large_string_array() {
3576        test_invalid_utf8_string_array_inner::<i64>();
3577    }
3578
3579    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3580        let cases = [
3581            invalid_utf8_first_char::<O>(),
3582            invalid_utf8_first_char_long_strings::<O>(),
3583            invalid_utf8_later_char::<O>(),
3584            invalid_utf8_later_char_long_strings::<O>(),
3585            invalid_utf8_later_char_really_long_strings::<O>(),
3586            invalid_utf8_later_char_really_long_strings2::<O>(),
3587        ];
3588        for array in &cases {
3589            for encoding in STRING_ENCODINGS {
3590                // data is not valid utf8 we can not construct a correct StringArray
3591                // safely, so purposely create an invalid StringArray
3592                let array = unsafe {
3593                    GenericStringArray::<O>::new_unchecked(
3594                        array.offsets().clone(),
3595                        array.values().clone(),
3596                        array.nulls().cloned(),
3597                    )
3598                };
3599                let data_type = array.data_type().clone();
3600                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3601                let err = read_from_parquet(data).unwrap_err();
3602                let expected_err =
3603                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3604                assert!(
3605                    err.to_string().contains(expected_err),
3606                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3607                );
3608            }
3609        }
3610    }
3611
3612    #[test]
3613    fn test_invalid_utf8_string_view_array() {
3614        let cases = [
3615            invalid_utf8_first_char::<i32>(),
3616            invalid_utf8_first_char_long_strings::<i32>(),
3617            invalid_utf8_later_char::<i32>(),
3618            invalid_utf8_later_char_long_strings::<i32>(),
3619            invalid_utf8_later_char_really_long_strings::<i32>(),
3620            invalid_utf8_later_char_really_long_strings2::<i32>(),
3621        ];
3622
3623        for encoding in STRING_ENCODINGS {
3624            for array in &cases {
3625                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3626                let array = array.as_binary_view();
3627
3628                // data is not valid utf8 we can not construct a correct StringArray
3629                // safely, so purposely create an invalid StringViewArray
3630                let array = unsafe {
3631                    StringViewArray::new_unchecked(
3632                        array.views().clone(),
3633                        array.data_buffers().to_vec(),
3634                        array.nulls().cloned(),
3635                    )
3636                };
3637
3638                let data_type = array.data_type().clone();
3639                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3640                let err = read_from_parquet(data).unwrap_err();
3641                let expected_err =
3642                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3643                assert!(
3644                    err.to_string().contains(expected_err),
3645                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3646                );
3647            }
3648        }
3649    }
3650
3651    /// Encodings suitable for string data
3652    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3653        None,
3654        Some(Encoding::PLAIN),
3655        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3656        Some(Encoding::DELTA_BYTE_ARRAY),
3657    ];
3658
3659    /// Invalid Utf-8 sequence in the first character
3660    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3661    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3662
3663    /// Invalid Utf=8 sequence in NOT the first character
3664    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3665    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3666
3667    /// returns a BinaryArray with invalid UTF8 data in the first character
3668    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3669        let valid: &[u8] = b"   ";
3670        let invalid = INVALID_UTF8_FIRST_CHAR;
3671        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3672    }
3673
3674    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3675    /// string larger than 12 bytes which is handled specially when reading
3676    /// `ByteViewArray`s
3677    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3678        let valid: &[u8] = b"   ";
3679        let mut invalid = vec![];
3680        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3681        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3682        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3683    }
3684
3685    /// returns a BinaryArray with invalid UTF8 data in a character other than
3686    /// the first (this is checked in a special codepath)
3687    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3688        let valid: &[u8] = b"   ";
3689        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3690        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3691    }
3692
3693    /// returns a BinaryArray with invalid UTF8 data in a character other than
3694    /// the first in a string larger than 12 bytes which is handled specially
3695    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3696    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3697        let valid: &[u8] = b"   ";
3698        let mut invalid = vec![];
3699        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3700        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3701        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3702    }
3703
3704    /// returns a BinaryArray with invalid UTF8 data in a character other than
3705    /// the first in a string larger than 128 bytes which is handled specially
3706    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3707    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3708        let valid: &[u8] = b"   ";
3709        let mut invalid = vec![];
3710        for _ in 0..10 {
3711            // each instance is 38 bytes
3712            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3713        }
3714        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3715        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3716    }
3717
3718    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3719    /// invalid UTF8 data in a character other than the first in a string larger
3720    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3721        let valid: &[u8] = b"   ";
3722        let mut valid_long = vec![];
3723        for _ in 0..10 {
3724            // each instance is 38 bytes
3725            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3726        }
3727        let invalid = INVALID_UTF8_LATER_CHAR;
3728        GenericBinaryArray::<O>::from_iter(vec![
3729            None,
3730            Some(valid),
3731            Some(invalid),
3732            None,
3733            Some(&valid_long),
3734            Some(valid),
3735        ])
3736    }
3737
3738    /// writes the array into a single column parquet file with the specified
3739    /// encoding.
3740    ///
3741    /// If no encoding is specified, use default (dictionary) encoding
3742    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3743        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3744        let mut data = vec![];
3745        let schema = batch.schema();
3746        let props = encoding.map(|encoding| {
3747            WriterProperties::builder()
3748                // must disable dictionary encoding to actually use encoding
3749                .set_dictionary_enabled(false)
3750                .set_encoding(encoding)
3751                .build()
3752        });
3753
3754        {
3755            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3756            writer.write(&batch).unwrap();
3757            writer.flush().unwrap();
3758            writer.close().unwrap();
3759        };
3760        data
3761    }
3762
3763    /// read the parquet file into a record batch
3764    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3765        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3766            .unwrap()
3767            .build()
3768            .unwrap();
3769
3770        reader.collect()
3771    }
3772
3773    #[test]
3774    fn test_dictionary_preservation() {
3775        let fields = vec![Arc::new(
3776            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3777                .with_repetition(Repetition::OPTIONAL)
3778                .with_converted_type(ConvertedType::UTF8)
3779                .build()
3780                .unwrap(),
3781        )];
3782
3783        let schema = Arc::new(
3784            Type::group_type_builder("test_schema")
3785                .with_fields(fields)
3786                .build()
3787                .unwrap(),
3788        );
3789
3790        let dict_type = ArrowDataType::Dictionary(
3791            Box::new(ArrowDataType::Int32),
3792            Box::new(ArrowDataType::Utf8),
3793        );
3794
3795        let arrow_field = Field::new("leaf", dict_type, true);
3796
3797        let mut file = tempfile::tempfile().unwrap();
3798
3799        let values = vec![
3800            vec![
3801                ByteArray::from("hello"),
3802                ByteArray::from("a"),
3803                ByteArray::from("b"),
3804                ByteArray::from("d"),
3805            ],
3806            vec![
3807                ByteArray::from("c"),
3808                ByteArray::from("a"),
3809                ByteArray::from("b"),
3810            ],
3811        ];
3812
3813        let def_levels = vec![
3814            vec![1, 0, 0, 1, 0, 0, 1, 1],
3815            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3816        ];
3817
3818        let opts = TestOptions {
3819            encoding: Encoding::RLE_DICTIONARY,
3820            ..Default::default()
3821        };
3822
3823        generate_single_column_file_with_data::<ByteArrayType>(
3824            &values,
3825            Some(&def_levels),
3826            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3827            schema,
3828            Some(arrow_field),
3829            &opts,
3830        )
3831        .unwrap();
3832
3833        file.rewind().unwrap();
3834
3835        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3836
3837        let batches = record_reader
3838            .collect::<Result<Vec<RecordBatch>, _>>()
3839            .unwrap();
3840
3841        assert_eq!(batches.len(), 6);
3842        assert!(batches.iter().all(|x| x.num_columns() == 1));
3843
3844        let row_counts = batches
3845            .iter()
3846            .map(|x| (x.num_rows(), x.column(0).null_count()))
3847            .collect::<Vec<_>>();
3848
3849        assert_eq!(
3850            row_counts,
3851            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3852        );
3853
3854        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3855
3856        // First and second batch in same row group -> same dictionary
3857        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3858        // Third batch spans row group -> computed dictionary
3859        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3860        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3861        // Fourth, fifth and sixth from same row group -> same dictionary
3862        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3863        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3864    }
3865
3866    #[test]
3867    fn test_read_null_list() {
3868        let testdata = arrow::util::test_util::parquet_test_data();
3869        let path = format!("{testdata}/null_list.parquet");
3870        let file = File::open(path).unwrap();
3871        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3872
3873        let batch = record_batch_reader.next().unwrap().unwrap();
3874        assert_eq!(batch.num_rows(), 1);
3875        assert_eq!(batch.num_columns(), 1);
3876        assert_eq!(batch.column(0).len(), 1);
3877
3878        let list = batch
3879            .column(0)
3880            .as_any()
3881            .downcast_ref::<ListArray>()
3882            .unwrap();
3883        assert_eq!(list.len(), 1);
3884        assert!(list.is_valid(0));
3885
3886        let val = list.value(0);
3887        assert_eq!(val.len(), 0);
3888    }
3889
3890    #[test]
3891    fn test_null_schema_inference() {
3892        let testdata = arrow::util::test_util::parquet_test_data();
3893        let path = format!("{testdata}/null_list.parquet");
3894        let file = File::open(path).unwrap();
3895
3896        let arrow_field = Field::new(
3897            "emptylist",
3898            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3899            true,
3900        );
3901
3902        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3903        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3904        let schema = builder.schema();
3905        assert_eq!(schema.fields().len(), 1);
3906        assert_eq!(schema.field(0), &arrow_field);
3907    }
3908
3909    #[test]
3910    fn test_skip_metadata() {
3911        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3912        let field = Field::new("col", col.data_type().clone(), true);
3913
3914        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3915
3916        let metadata = [("key".to_string(), "value".to_string())]
3917            .into_iter()
3918            .collect();
3919
3920        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3921
3922        assert_ne!(schema_with_metadata, schema_without_metadata);
3923
3924        let batch =
3925            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3926
3927        let file = |version: WriterVersion| {
3928            let props = WriterProperties::builder()
3929                .set_writer_version(version)
3930                .build();
3931
3932            let file = tempfile().unwrap();
3933            let mut writer =
3934                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3935                    .unwrap();
3936            writer.write(&batch).unwrap();
3937            writer.close().unwrap();
3938            file
3939        };
3940
3941        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3942
3943        let v1_reader = file(WriterVersion::PARQUET_1_0);
3944        let v2_reader = file(WriterVersion::PARQUET_2_0);
3945
3946        let arrow_reader =
3947            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3948        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3949
3950        let reader =
3951            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3952                .unwrap()
3953                .build()
3954                .unwrap();
3955        assert_eq!(reader.schema(), schema_without_metadata);
3956
3957        let arrow_reader =
3958            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3959        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3960
3961        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3962            .unwrap()
3963            .build()
3964            .unwrap();
3965        assert_eq!(reader.schema(), schema_without_metadata);
3966    }
3967
3968    fn write_parquet_from_iter<I, F>(value: I) -> File
3969    where
3970        I: IntoIterator<Item = (F, ArrayRef)>,
3971        F: AsRef<str>,
3972    {
3973        let batch = RecordBatch::try_from_iter(value).unwrap();
3974        let file = tempfile().unwrap();
3975        let mut writer =
3976            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3977        writer.write(&batch).unwrap();
3978        writer.close().unwrap();
3979        file
3980    }
3981
3982    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3983    where
3984        I: IntoIterator<Item = (F, ArrayRef)>,
3985        F: AsRef<str>,
3986    {
3987        let file = write_parquet_from_iter(value);
3988        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3989        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3990            file.try_clone().unwrap(),
3991            options_with_schema,
3992        );
3993        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3994    }
3995
3996    #[test]
3997    fn test_schema_too_few_columns() {
3998        run_schema_test_with_error(
3999            vec![
4000                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4001                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4002            ],
4003            Arc::new(Schema::new(vec![Field::new(
4004                "int64",
4005                ArrowDataType::Int64,
4006                false,
4007            )])),
4008            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4009        );
4010    }
4011
4012    #[test]
4013    fn test_schema_too_many_columns() {
4014        run_schema_test_with_error(
4015            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4016            Arc::new(Schema::new(vec![
4017                Field::new("int64", ArrowDataType::Int64, false),
4018                Field::new("int32", ArrowDataType::Int32, false),
4019            ])),
4020            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4021        );
4022    }
4023
4024    #[test]
4025    fn test_schema_mismatched_column_names() {
4026        run_schema_test_with_error(
4027            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4028            Arc::new(Schema::new(vec![Field::new(
4029                "other",
4030                ArrowDataType::Int64,
4031                false,
4032            )])),
4033            "Arrow: incompatible arrow schema, expected field named int64 got other",
4034        );
4035    }
4036
4037    #[test]
4038    fn test_schema_incompatible_columns() {
4039        run_schema_test_with_error(
4040            vec![
4041                (
4042                    "col1_invalid",
4043                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4044                ),
4045                (
4046                    "col2_valid",
4047                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4048                ),
4049                (
4050                    "col3_invalid",
4051                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4052                ),
4053            ],
4054            Arc::new(Schema::new(vec![
4055                Field::new("col1_invalid", ArrowDataType::Int32, false),
4056                Field::new("col2_valid", ArrowDataType::Int32, false),
4057                Field::new("col3_invalid", ArrowDataType::Int32, false),
4058            ])),
4059            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4060        );
4061    }
4062
4063    #[test]
4064    fn test_one_incompatible_nested_column() {
4065        let nested_fields = Fields::from(vec![
4066            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4067            Field::new("nested1_invalid", ArrowDataType::Int64, false),
4068        ]);
4069        let nested = StructArray::try_new(
4070            nested_fields,
4071            vec![
4072                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4073                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4074            ],
4075            None,
4076        )
4077        .expect("struct array");
4078        let supplied_nested_fields = Fields::from(vec![
4079            Field::new("nested1_valid", ArrowDataType::Utf8, false),
4080            Field::new("nested1_invalid", ArrowDataType::Int32, false),
4081        ]);
4082        run_schema_test_with_error(
4083            vec![
4084                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4085                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4086                ("nested", Arc::new(nested) as ArrayRef),
4087            ],
4088            Arc::new(Schema::new(vec![
4089                Field::new("col1", ArrowDataType::Int64, false),
4090                Field::new("col2", ArrowDataType::Int32, false),
4091                Field::new(
4092                    "nested",
4093                    ArrowDataType::Struct(supplied_nested_fields),
4094                    false,
4095                ),
4096            ])),
4097            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4098            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4099            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4100        );
4101    }
4102
4103    /// Return parquet data with a single column of utf8 strings
4104    fn utf8_parquet() -> Bytes {
4105        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4106        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4107        let props = None;
4108        // write parquet file with non nullable strings
4109        let mut parquet_data = vec![];
4110        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4111        writer.write(&batch).unwrap();
4112        writer.close().unwrap();
4113        Bytes::from(parquet_data)
4114    }
4115
4116    #[test]
4117    fn test_schema_error_bad_types() {
4118        // verify incompatible schemas error on read
4119        let parquet_data = utf8_parquet();
4120
4121        // Ask to read it back with an incompatible schema (int vs string)
4122        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4123            "column1",
4124            arrow::datatypes::DataType::Int32,
4125            false,
4126        )]));
4127
4128        // read it back out
4129        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4130        let err =
4131            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4132                .unwrap_err();
4133        assert_eq!(
4134            err.to_string(),
4135            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4136        )
4137    }
4138
4139    #[test]
4140    fn test_schema_error_bad_nullability() {
4141        // verify incompatible schemas error on read
4142        let parquet_data = utf8_parquet();
4143
4144        // Ask to read it back with an incompatible schema (nullability mismatch)
4145        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4146            "column1",
4147            arrow::datatypes::DataType::Utf8,
4148            true,
4149        )]));
4150
4151        // read it back out
4152        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4153        let err =
4154            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4155                .unwrap_err();
4156        assert_eq!(
4157            err.to_string(),
4158            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4159        )
4160    }
4161
4162    #[test]
4163    fn test_read_binary_as_utf8() {
4164        let file = write_parquet_from_iter(vec![
4165            (
4166                "binary_to_utf8",
4167                Arc::new(BinaryArray::from(vec![
4168                    b"one".as_ref(),
4169                    b"two".as_ref(),
4170                    b"three".as_ref(),
4171                ])) as ArrayRef,
4172            ),
4173            (
4174                "large_binary_to_large_utf8",
4175                Arc::new(LargeBinaryArray::from(vec![
4176                    b"one".as_ref(),
4177                    b"two".as_ref(),
4178                    b"three".as_ref(),
4179                ])) as ArrayRef,
4180            ),
4181            (
4182                "binary_view_to_utf8_view",
4183                Arc::new(BinaryViewArray::from(vec![
4184                    b"one".as_ref(),
4185                    b"two".as_ref(),
4186                    b"three".as_ref(),
4187                ])) as ArrayRef,
4188            ),
4189        ]);
4190        let supplied_fields = Fields::from(vec![
4191            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4192            Field::new(
4193                "large_binary_to_large_utf8",
4194                ArrowDataType::LargeUtf8,
4195                false,
4196            ),
4197            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4198        ]);
4199
4200        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4201        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4202            file.try_clone().unwrap(),
4203            options,
4204        )
4205        .expect("reader builder with schema")
4206        .build()
4207        .expect("reader with schema");
4208
4209        let batch = arrow_reader.next().unwrap().unwrap();
4210        assert_eq!(batch.num_columns(), 3);
4211        assert_eq!(batch.num_rows(), 3);
4212        assert_eq!(
4213            batch
4214                .column(0)
4215                .as_string::<i32>()
4216                .iter()
4217                .collect::<Vec<_>>(),
4218            vec![Some("one"), Some("two"), Some("three")]
4219        );
4220
4221        assert_eq!(
4222            batch
4223                .column(1)
4224                .as_string::<i64>()
4225                .iter()
4226                .collect::<Vec<_>>(),
4227            vec![Some("one"), Some("two"), Some("three")]
4228        );
4229
4230        assert_eq!(
4231            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4232            vec![Some("one"), Some("two"), Some("three")]
4233        );
4234    }
4235
4236    #[test]
4237    #[should_panic(expected = "Invalid UTF8 sequence at")]
4238    fn test_read_non_utf8_binary_as_utf8() {
4239        let file = write_parquet_from_iter(vec![(
4240            "non_utf8_binary",
4241            Arc::new(BinaryArray::from(vec![
4242                b"\xDE\x00\xFF".as_ref(),
4243                b"\xDE\x01\xAA".as_ref(),
4244                b"\xDE\x02\xFF".as_ref(),
4245            ])) as ArrayRef,
4246        )]);
4247        let supplied_fields = Fields::from(vec![Field::new(
4248            "non_utf8_binary",
4249            ArrowDataType::Utf8,
4250            false,
4251        )]);
4252
4253        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4254        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4255            file.try_clone().unwrap(),
4256            options,
4257        )
4258        .expect("reader builder with schema")
4259        .build()
4260        .expect("reader with schema");
4261        arrow_reader.next().unwrap().unwrap_err();
4262    }
4263
4264    #[test]
4265    fn test_with_schema() {
4266        let nested_fields = Fields::from(vec![
4267            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4268            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4269        ]);
4270
4271        let nested_arrays: Vec<ArrayRef> = vec![
4272            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4273            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4274        ];
4275
4276        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4277
4278        let file = write_parquet_from_iter(vec![
4279            (
4280                "int32_to_ts_second",
4281                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4282            ),
4283            (
4284                "date32_to_date64",
4285                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4286            ),
4287            ("nested", Arc::new(nested) as ArrayRef),
4288        ]);
4289
4290        let supplied_nested_fields = Fields::from(vec![
4291            Field::new(
4292                "utf8_to_dict",
4293                ArrowDataType::Dictionary(
4294                    Box::new(ArrowDataType::Int32),
4295                    Box::new(ArrowDataType::Utf8),
4296                ),
4297                false,
4298            ),
4299            Field::new(
4300                "int64_to_ts_nano",
4301                ArrowDataType::Timestamp(
4302                    arrow::datatypes::TimeUnit::Nanosecond,
4303                    Some("+10:00".into()),
4304                ),
4305                false,
4306            ),
4307        ]);
4308
4309        let supplied_schema = Arc::new(Schema::new(vec![
4310            Field::new(
4311                "int32_to_ts_second",
4312                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4313                false,
4314            ),
4315            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4316            Field::new(
4317                "nested",
4318                ArrowDataType::Struct(supplied_nested_fields),
4319                false,
4320            ),
4321        ]));
4322
4323        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4324        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4325            file.try_clone().unwrap(),
4326            options,
4327        )
4328        .expect("reader builder with schema")
4329        .build()
4330        .expect("reader with schema");
4331
4332        assert_eq!(arrow_reader.schema(), supplied_schema);
4333        let batch = arrow_reader.next().unwrap().unwrap();
4334        assert_eq!(batch.num_columns(), 3);
4335        assert_eq!(batch.num_rows(), 4);
4336        assert_eq!(
4337            batch
4338                .column(0)
4339                .as_any()
4340                .downcast_ref::<TimestampSecondArray>()
4341                .expect("downcast to timestamp second")
4342                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4343                .map(|v| v.to_string())
4344                .expect("value as datetime"),
4345            "1970-01-01 01:00:00 +01:00"
4346        );
4347        assert_eq!(
4348            batch
4349                .column(1)
4350                .as_any()
4351                .downcast_ref::<Date64Array>()
4352                .expect("downcast to date64")
4353                .value_as_date(0)
4354                .map(|v| v.to_string())
4355                .expect("value as date"),
4356            "1970-01-01"
4357        );
4358
4359        let nested = batch
4360            .column(2)
4361            .as_any()
4362            .downcast_ref::<StructArray>()
4363            .expect("downcast to struct");
4364
4365        let nested_dict = nested
4366            .column(0)
4367            .as_any()
4368            .downcast_ref::<Int32DictionaryArray>()
4369            .expect("downcast to dictionary");
4370
4371        assert_eq!(
4372            nested_dict
4373                .values()
4374                .as_any()
4375                .downcast_ref::<StringArray>()
4376                .expect("downcast to string")
4377                .iter()
4378                .collect::<Vec<_>>(),
4379            vec![Some("a"), Some("b")]
4380        );
4381
4382        assert_eq!(
4383            nested_dict.keys().iter().collect::<Vec<_>>(),
4384            vec![Some(0), Some(0), Some(0), Some(1)]
4385        );
4386
4387        assert_eq!(
4388            nested
4389                .column(1)
4390                .as_any()
4391                .downcast_ref::<TimestampNanosecondArray>()
4392                .expect("downcast to timestamp nanosecond")
4393                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4394                .map(|v| v.to_string())
4395                .expect("value as datetime"),
4396            "1970-01-01 10:00:00.000000001 +10:00"
4397        );
4398    }
4399
4400    #[test]
4401    fn test_empty_projection() {
4402        let testdata = arrow::util::test_util::parquet_test_data();
4403        let path = format!("{testdata}/alltypes_plain.parquet");
4404        let file = File::open(path).unwrap();
4405
4406        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4407        let file_metadata = builder.metadata().file_metadata();
4408        let expected_rows = file_metadata.num_rows() as usize;
4409
4410        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4411        let batch_reader = builder
4412            .with_projection(mask)
4413            .with_batch_size(2)
4414            .build()
4415            .unwrap();
4416
4417        let mut total_rows = 0;
4418        for maybe_batch in batch_reader {
4419            let batch = maybe_batch.unwrap();
4420            total_rows += batch.num_rows();
4421            assert_eq!(batch.num_columns(), 0);
4422            assert!(batch.num_rows() <= 2);
4423        }
4424
4425        assert_eq!(total_rows, expected_rows);
4426    }
4427
4428    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4429        let schema = Arc::new(Schema::new(vec![Field::new(
4430            "list",
4431            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4432            true,
4433        )]));
4434
4435        let mut buf = Vec::with_capacity(1024);
4436
4437        let mut writer = ArrowWriter::try_new(
4438            &mut buf,
4439            schema.clone(),
4440            Some(
4441                WriterProperties::builder()
4442                    .set_max_row_group_size(row_group_size)
4443                    .build(),
4444            ),
4445        )
4446        .unwrap();
4447        for _ in 0..2 {
4448            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4449            for _ in 0..(batch_size) {
4450                list_builder.append(true);
4451            }
4452            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4453                .unwrap();
4454            writer.write(&batch).unwrap();
4455        }
4456        writer.close().unwrap();
4457
4458        let mut record_reader =
4459            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4460        assert_eq!(
4461            batch_size,
4462            record_reader.next().unwrap().unwrap().num_rows()
4463        );
4464        assert_eq!(
4465            batch_size,
4466            record_reader.next().unwrap().unwrap().num_rows()
4467        );
4468    }
4469
4470    #[test]
4471    fn test_row_group_exact_multiple() {
4472        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4473        test_row_group_batch(8, 8);
4474        test_row_group_batch(10, 8);
4475        test_row_group_batch(8, 10);
4476        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4477        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4478        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4479        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4480        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4481    }
4482
4483    /// Given a RecordBatch containing all the column data, return the expected batches given
4484    /// a `batch_size` and `selection`
4485    fn get_expected_batches(
4486        column: &RecordBatch,
4487        selection: &RowSelection,
4488        batch_size: usize,
4489    ) -> Vec<RecordBatch> {
4490        let mut expected_batches = vec![];
4491
4492        let mut selection: VecDeque<_> = selection.clone().into();
4493        let mut row_offset = 0;
4494        let mut last_start = None;
4495        while row_offset < column.num_rows() && !selection.is_empty() {
4496            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4497            while batch_remaining > 0 && !selection.is_empty() {
4498                let (to_read, skip) = match selection.front_mut() {
4499                    Some(selection) if selection.row_count > batch_remaining => {
4500                        selection.row_count -= batch_remaining;
4501                        (batch_remaining, selection.skip)
4502                    }
4503                    Some(_) => {
4504                        let select = selection.pop_front().unwrap();
4505                        (select.row_count, select.skip)
4506                    }
4507                    None => break,
4508                };
4509
4510                batch_remaining -= to_read;
4511
4512                match skip {
4513                    true => {
4514                        if let Some(last_start) = last_start.take() {
4515                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4516                        }
4517                        row_offset += to_read
4518                    }
4519                    false => {
4520                        last_start.get_or_insert(row_offset);
4521                        row_offset += to_read
4522                    }
4523                }
4524            }
4525        }
4526
4527        if let Some(last_start) = last_start.take() {
4528            expected_batches.push(column.slice(last_start, row_offset - last_start))
4529        }
4530
4531        // Sanity check, all batches except the final should be the batch size
4532        for batch in &expected_batches[..expected_batches.len() - 1] {
4533            assert_eq!(batch.num_rows(), batch_size);
4534        }
4535
4536        expected_batches
4537    }
4538
4539    fn create_test_selection(
4540        step_len: usize,
4541        total_len: usize,
4542        skip_first: bool,
4543    ) -> (RowSelection, usize) {
4544        let mut remaining = total_len;
4545        let mut skip = skip_first;
4546        let mut vec = vec![];
4547        let mut selected_count = 0;
4548        while remaining != 0 {
4549            let step = if remaining > step_len {
4550                step_len
4551            } else {
4552                remaining
4553            };
4554            vec.push(RowSelector {
4555                row_count: step,
4556                skip,
4557            });
4558            remaining -= step;
4559            if !skip {
4560                selected_count += step;
4561            }
4562            skip = !skip;
4563        }
4564        (vec.into(), selected_count)
4565    }
4566
4567    #[test]
4568    fn test_scan_row_with_selection() {
4569        let testdata = arrow::util::test_util::parquet_test_data();
4570        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4571        let test_file = File::open(&path).unwrap();
4572
4573        let mut serial_reader =
4574            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4575        let data = serial_reader.next().unwrap().unwrap();
4576
4577        let do_test = |batch_size: usize, selection_len: usize| {
4578            for skip_first in [false, true] {
4579                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4580
4581                let expected = get_expected_batches(&data, &selections, batch_size);
4582                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4583                assert_eq!(
4584                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4585                    expected,
4586                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4587                );
4588            }
4589        };
4590
4591        // total row count 7300
4592        // 1. test selection len more than one page row count
4593        do_test(1000, 1000);
4594
4595        // 2. test selection len less than one page row count
4596        do_test(20, 20);
4597
4598        // 3. test selection_len less than batch_size
4599        do_test(20, 5);
4600
4601        // 4. test selection_len more than batch_size
4602        // If batch_size < selection_len
4603        do_test(20, 5);
4604
4605        fn create_skip_reader(
4606            test_file: &File,
4607            batch_size: usize,
4608            selections: RowSelection,
4609        ) -> ParquetRecordBatchReader {
4610            let options = ArrowReaderOptions::new().with_page_index(true);
4611            let file = test_file.try_clone().unwrap();
4612            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4613                .unwrap()
4614                .with_batch_size(batch_size)
4615                .with_row_selection(selections)
4616                .build()
4617                .unwrap()
4618        }
4619    }
4620
4621    #[test]
4622    fn test_batch_size_overallocate() {
4623        let testdata = arrow::util::test_util::parquet_test_data();
4624        // `alltypes_plain.parquet` only have 8 rows
4625        let path = format!("{testdata}/alltypes_plain.parquet");
4626        let test_file = File::open(path).unwrap();
4627
4628        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4629        let num_rows = builder.metadata.file_metadata().num_rows();
4630        let reader = builder
4631            .with_batch_size(1024)
4632            .with_projection(ProjectionMask::all())
4633            .build()
4634            .unwrap();
4635        assert_ne!(1024, num_rows);
4636        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4637    }
4638
4639    #[test]
4640    fn test_read_with_page_index_enabled() {
4641        let testdata = arrow::util::test_util::parquet_test_data();
4642
4643        {
4644            // `alltypes_tiny_pages.parquet` has page index
4645            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4646            let test_file = File::open(path).unwrap();
4647            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4648                test_file,
4649                ArrowReaderOptions::new().with_page_index(true),
4650            )
4651            .unwrap();
4652            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4653            let reader = builder.build().unwrap();
4654            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4655            assert_eq!(batches.len(), 8);
4656        }
4657
4658        {
4659            // `alltypes_plain.parquet` doesn't have page index
4660            let path = format!("{testdata}/alltypes_plain.parquet");
4661            let test_file = File::open(path).unwrap();
4662            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4663                test_file,
4664                ArrowReaderOptions::new().with_page_index(true),
4665            )
4666            .unwrap();
4667            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4668            // we should read the file successfully.
4669            assert!(builder.metadata().offset_index().is_none());
4670            let reader = builder.build().unwrap();
4671            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4672            assert_eq!(batches.len(), 1);
4673        }
4674    }
4675
4676    #[test]
4677    fn test_raw_repetition() {
4678        const MESSAGE_TYPE: &str = "
4679            message Log {
4680              OPTIONAL INT32 eventType;
4681              REPEATED INT32 category;
4682              REPEATED group filter {
4683                OPTIONAL INT32 error;
4684              }
4685            }
4686        ";
4687        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4688        let props = Default::default();
4689
4690        let mut buf = Vec::with_capacity(1024);
4691        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4692        let mut row_group_writer = writer.next_row_group().unwrap();
4693
4694        // column 0
4695        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4696        col_writer
4697            .typed::<Int32Type>()
4698            .write_batch(&[1], Some(&[1]), None)
4699            .unwrap();
4700        col_writer.close().unwrap();
4701        // column 1
4702        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4703        col_writer
4704            .typed::<Int32Type>()
4705            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4706            .unwrap();
4707        col_writer.close().unwrap();
4708        // column 2
4709        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4710        col_writer
4711            .typed::<Int32Type>()
4712            .write_batch(&[1], Some(&[1]), Some(&[0]))
4713            .unwrap();
4714        col_writer.close().unwrap();
4715
4716        let rg_md = row_group_writer.close().unwrap();
4717        assert_eq!(rg_md.num_rows(), 1);
4718        writer.close().unwrap();
4719
4720        let bytes = Bytes::from(buf);
4721
4722        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4723        let full = no_mask.next().unwrap().unwrap();
4724
4725        assert_eq!(full.num_columns(), 3);
4726
4727        for idx in 0..3 {
4728            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4729            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4730            let mut reader = b.with_projection(mask).build().unwrap();
4731            let projected = reader.next().unwrap().unwrap();
4732
4733            assert_eq!(projected.num_columns(), 1);
4734            assert_eq!(full.column(idx), projected.column(0));
4735        }
4736    }
4737
4738    #[test]
4739    fn test_read_lz4_raw() {
4740        let testdata = arrow::util::test_util::parquet_test_data();
4741        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4742        let file = File::open(path).unwrap();
4743
4744        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4745            .unwrap()
4746            .collect::<Result<Vec<_>, _>>()
4747            .unwrap();
4748        assert_eq!(batches.len(), 1);
4749        let batch = &batches[0];
4750
4751        assert_eq!(batch.num_columns(), 3);
4752        assert_eq!(batch.num_rows(), 4);
4753
4754        // https://github.com/apache/parquet-testing/pull/18
4755        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4756        assert_eq!(
4757            a.values(),
4758            &[1593604800, 1593604800, 1593604801, 1593604801]
4759        );
4760
4761        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4762        let a: Vec<_> = a.iter().flatten().collect();
4763        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4764
4765        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4766        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4767    }
4768
4769    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4770    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4771    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4772    //    LZ4_HADOOP algorithm for compression.
4773    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4774    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4775    //    older parquet-cpp versions.
4776    //
4777    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4778    #[test]
4779    fn test_read_lz4_hadoop_fallback() {
4780        for file in [
4781            "hadoop_lz4_compressed.parquet",
4782            "non_hadoop_lz4_compressed.parquet",
4783        ] {
4784            let testdata = arrow::util::test_util::parquet_test_data();
4785            let path = format!("{testdata}/{file}");
4786            let file = File::open(path).unwrap();
4787            let expected_rows = 4;
4788
4789            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4790                .unwrap()
4791                .collect::<Result<Vec<_>, _>>()
4792                .unwrap();
4793            assert_eq!(batches.len(), 1);
4794            let batch = &batches[0];
4795
4796            assert_eq!(batch.num_columns(), 3);
4797            assert_eq!(batch.num_rows(), expected_rows);
4798
4799            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4800            assert_eq!(
4801                a.values(),
4802                &[1593604800, 1593604800, 1593604801, 1593604801]
4803            );
4804
4805            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4806            let b: Vec<_> = b.iter().flatten().collect();
4807            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4808
4809            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4810            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4811        }
4812    }
4813
4814    #[test]
4815    fn test_read_lz4_hadoop_large() {
4816        let testdata = arrow::util::test_util::parquet_test_data();
4817        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4818        let file = File::open(path).unwrap();
4819        let expected_rows = 10000;
4820
4821        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4822            .unwrap()
4823            .collect::<Result<Vec<_>, _>>()
4824            .unwrap();
4825        assert_eq!(batches.len(), 1);
4826        let batch = &batches[0];
4827
4828        assert_eq!(batch.num_columns(), 1);
4829        assert_eq!(batch.num_rows(), expected_rows);
4830
4831        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4832        let a: Vec<_> = a.iter().flatten().collect();
4833        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4834        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4835        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4836        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4837    }
4838
4839    #[test]
4840    #[cfg(feature = "snap")]
4841    fn test_read_nested_lists() {
4842        let testdata = arrow::util::test_util::parquet_test_data();
4843        let path = format!("{testdata}/nested_lists.snappy.parquet");
4844        let file = File::open(path).unwrap();
4845
4846        let f = file.try_clone().unwrap();
4847        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4848        let expected = reader.next().unwrap().unwrap();
4849        assert_eq!(expected.num_rows(), 3);
4850
4851        let selection = RowSelection::from(vec![
4852            RowSelector::skip(1),
4853            RowSelector::select(1),
4854            RowSelector::skip(1),
4855        ]);
4856        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4857            .unwrap()
4858            .with_row_selection(selection)
4859            .build()
4860            .unwrap();
4861
4862        let actual = reader.next().unwrap().unwrap();
4863        assert_eq!(actual.num_rows(), 1);
4864        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4865    }
4866
4867    #[test]
4868    fn test_arbitrary_decimal() {
4869        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4870        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4871            .with_precision_and_scale(19, 0)
4872            .unwrap();
4873        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4874            .with_precision_and_scale(12, 0)
4875            .unwrap();
4876        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4877            .with_precision_and_scale(17, 10)
4878            .unwrap();
4879
4880        let written = RecordBatch::try_from_iter([
4881            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4882            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4883            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4884        ])
4885        .unwrap();
4886
4887        let mut buffer = Vec::with_capacity(1024);
4888        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4889        writer.write(&written).unwrap();
4890        writer.close().unwrap();
4891
4892        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4893            .unwrap()
4894            .collect::<Result<Vec<_>, _>>()
4895            .unwrap();
4896
4897        assert_eq!(&written.slice(0, 8), &read[0]);
4898    }
4899
4900    #[test]
4901    fn test_list_skip() {
4902        let mut list = ListBuilder::new(Int32Builder::new());
4903        list.append_value([Some(1), Some(2)]);
4904        list.append_value([Some(3)]);
4905        list.append_value([Some(4)]);
4906        let list = list.finish();
4907        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4908
4909        // First page contains 2 values but only 1 row
4910        let props = WriterProperties::builder()
4911            .set_data_page_row_count_limit(1)
4912            .set_write_batch_size(2)
4913            .build();
4914
4915        let mut buffer = Vec::with_capacity(1024);
4916        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4917        writer.write(&batch).unwrap();
4918        writer.close().unwrap();
4919
4920        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4921        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4922            .unwrap()
4923            .with_row_selection(selection.into())
4924            .build()
4925            .unwrap();
4926        let out = reader.next().unwrap().unwrap();
4927        assert_eq!(out.num_rows(), 1);
4928        assert_eq!(out, batch.slice(2, 1));
4929    }
4930
4931    #[test]
4932    fn test_row_selection_interleaved_skip() -> Result<()> {
4933        let schema = Arc::new(Schema::new(vec![Field::new(
4934            "v",
4935            ArrowDataType::Int32,
4936            false,
4937        )]));
4938
4939        let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
4940        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
4941
4942        let mut buffer = Vec::with_capacity(1024);
4943        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
4944        writer.write(&batch)?;
4945        writer.close()?;
4946
4947        let selection = RowSelection::from(vec![
4948            RowSelector::select(1),
4949            RowSelector::skip(2),
4950            RowSelector::select(2),
4951        ]);
4952
4953        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
4954            .with_batch_size(4)
4955            .with_row_selection(selection)
4956            .build()?;
4957
4958        let out = reader.next().unwrap()?;
4959        assert_eq!(out.num_rows(), 3);
4960        let values = out
4961            .column(0)
4962            .as_primitive::<arrow_array::types::Int32Type>()
4963            .values();
4964        assert_eq!(values, &[0, 3, 4]);
4965        assert!(reader.next().is_none());
4966        Ok(())
4967    }
4968
4969    #[test]
4970    fn test_row_selection_mask_sparse_rows() -> Result<()> {
4971        let schema = Arc::new(Schema::new(vec![Field::new(
4972            "v",
4973            ArrowDataType::Int32,
4974            false,
4975        )]));
4976
4977        let values = Int32Array::from((0..30).collect::<Vec<i32>>());
4978        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
4979
4980        let mut buffer = Vec::with_capacity(1024);
4981        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
4982        writer.write(&batch)?;
4983        writer.close()?;
4984
4985        let total_rows = batch.num_rows();
4986        let ranges = (1..total_rows)
4987            .step_by(2)
4988            .map(|i| i..i + 1)
4989            .collect::<Vec<_>>();
4990        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
4991
4992        let selectors: Vec<RowSelector> = selection.clone().into();
4993        assert!(total_rows < selectors.len() * 8);
4994
4995        let bytes = Bytes::from(buffer);
4996
4997        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
4998            .with_batch_size(7)
4999            .with_row_selection(selection)
5000            .build()?;
5001
5002        let mut collected = Vec::new();
5003        for batch in reader {
5004            let batch = batch?;
5005            collected.extend_from_slice(
5006                batch
5007                    .column(0)
5008                    .as_primitive::<arrow_array::types::Int32Type>()
5009                    .values(),
5010            );
5011        }
5012
5013        let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5014        assert_eq!(collected, expected);
5015        Ok(())
5016    }
5017
5018    fn test_decimal32_roundtrip() {
5019        let d = |values: Vec<i32>, p: u8| {
5020            let iter = values.into_iter();
5021            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5022                .with_precision_and_scale(p, 2)
5023                .unwrap()
5024        };
5025
5026        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5027        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5028
5029        let mut buffer = Vec::with_capacity(1024);
5030        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5031        writer.write(&batch).unwrap();
5032        writer.close().unwrap();
5033
5034        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5035        let t1 = builder.parquet_schema().columns()[0].physical_type();
5036        assert_eq!(t1, PhysicalType::INT32);
5037
5038        let mut reader = builder.build().unwrap();
5039        assert_eq!(batch.schema(), reader.schema());
5040
5041        let out = reader.next().unwrap().unwrap();
5042        assert_eq!(batch, out);
5043    }
5044
5045    fn test_decimal64_roundtrip() {
5046        // Precision <= 9 -> INT32
5047        // Precision <= 18 -> INT64
5048
5049        let d = |values: Vec<i64>, p: u8| {
5050            let iter = values.into_iter();
5051            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5052                .with_precision_and_scale(p, 2)
5053                .unwrap()
5054        };
5055
5056        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5057        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5058        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5059
5060        let batch = RecordBatch::try_from_iter([
5061            ("d1", Arc::new(d1) as ArrayRef),
5062            ("d2", Arc::new(d2) as ArrayRef),
5063            ("d3", Arc::new(d3) as ArrayRef),
5064        ])
5065        .unwrap();
5066
5067        let mut buffer = Vec::with_capacity(1024);
5068        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5069        writer.write(&batch).unwrap();
5070        writer.close().unwrap();
5071
5072        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5073        let t1 = builder.parquet_schema().columns()[0].physical_type();
5074        assert_eq!(t1, PhysicalType::INT32);
5075        let t2 = builder.parquet_schema().columns()[1].physical_type();
5076        assert_eq!(t2, PhysicalType::INT64);
5077        let t3 = builder.parquet_schema().columns()[2].physical_type();
5078        assert_eq!(t3, PhysicalType::INT64);
5079
5080        let mut reader = builder.build().unwrap();
5081        assert_eq!(batch.schema(), reader.schema());
5082
5083        let out = reader.next().unwrap().unwrap();
5084        assert_eq!(batch, out);
5085    }
5086
5087    fn test_decimal_roundtrip<T: DecimalType>() {
5088        // Precision <= 9 -> INT32
5089        // Precision <= 18 -> INT64
5090        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5091
5092        let d = |values: Vec<usize>, p: u8| {
5093            let iter = values.into_iter().map(T::Native::usize_as);
5094            PrimitiveArray::<T>::from_iter_values(iter)
5095                .with_precision_and_scale(p, 2)
5096                .unwrap()
5097        };
5098
5099        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5100        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5101        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5102        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5103
5104        let batch = RecordBatch::try_from_iter([
5105            ("d1", Arc::new(d1) as ArrayRef),
5106            ("d2", Arc::new(d2) as ArrayRef),
5107            ("d3", Arc::new(d3) as ArrayRef),
5108            ("d4", Arc::new(d4) as ArrayRef),
5109        ])
5110        .unwrap();
5111
5112        let mut buffer = Vec::with_capacity(1024);
5113        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5114        writer.write(&batch).unwrap();
5115        writer.close().unwrap();
5116
5117        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5118        let t1 = builder.parquet_schema().columns()[0].physical_type();
5119        assert_eq!(t1, PhysicalType::INT32);
5120        let t2 = builder.parquet_schema().columns()[1].physical_type();
5121        assert_eq!(t2, PhysicalType::INT64);
5122        let t3 = builder.parquet_schema().columns()[2].physical_type();
5123        assert_eq!(t3, PhysicalType::INT64);
5124        let t4 = builder.parquet_schema().columns()[3].physical_type();
5125        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5126
5127        let mut reader = builder.build().unwrap();
5128        assert_eq!(batch.schema(), reader.schema());
5129
5130        let out = reader.next().unwrap().unwrap();
5131        assert_eq!(batch, out);
5132    }
5133
5134    #[test]
5135    fn test_decimal() {
5136        test_decimal32_roundtrip();
5137        test_decimal64_roundtrip();
5138        test_decimal_roundtrip::<Decimal128Type>();
5139        test_decimal_roundtrip::<Decimal256Type>();
5140    }
5141
5142    #[test]
5143    fn test_list_selection() {
5144        let schema = Arc::new(Schema::new(vec![Field::new_list(
5145            "list",
5146            Field::new_list_field(ArrowDataType::Utf8, true),
5147            false,
5148        )]));
5149        let mut buf = Vec::with_capacity(1024);
5150
5151        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5152
5153        for i in 0..2 {
5154            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5155            for j in 0..1024 {
5156                list_a_builder.values().append_value(format!("{i} {j}"));
5157                list_a_builder.append(true);
5158            }
5159            let batch =
5160                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5161                    .unwrap();
5162            writer.write(&batch).unwrap();
5163        }
5164        let _metadata = writer.close().unwrap();
5165
5166        let buf = Bytes::from(buf);
5167        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5168            .unwrap()
5169            .with_row_selection(RowSelection::from(vec![
5170                RowSelector::skip(100),
5171                RowSelector::select(924),
5172                RowSelector::skip(100),
5173                RowSelector::select(924),
5174            ]))
5175            .build()
5176            .unwrap();
5177
5178        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5179        let batch = concat_batches(&schema, &batches).unwrap();
5180
5181        assert_eq!(batch.num_rows(), 924 * 2);
5182        let list = batch.column(0).as_list::<i32>();
5183
5184        for w in list.value_offsets().windows(2) {
5185            assert_eq!(w[0] + 1, w[1])
5186        }
5187        let mut values = list.values().as_string::<i32>().iter();
5188
5189        for i in 0..2 {
5190            for j in 100..1024 {
5191                let expected = format!("{i} {j}");
5192                assert_eq!(values.next().unwrap().unwrap(), &expected);
5193            }
5194        }
5195    }
5196
5197    #[test]
5198    fn test_list_selection_fuzz() {
5199        let mut rng = rng();
5200        let schema = Arc::new(Schema::new(vec![Field::new_list(
5201            "list",
5202            Field::new_list(
5203                Field::LIST_FIELD_DEFAULT_NAME,
5204                Field::new_list_field(ArrowDataType::Int32, true),
5205                true,
5206            ),
5207            true,
5208        )]));
5209        let mut buf = Vec::with_capacity(1024);
5210        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5211
5212        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5213
5214        for _ in 0..2048 {
5215            if rng.random_bool(0.2) {
5216                list_a_builder.append(false);
5217                continue;
5218            }
5219
5220            let list_a_len = rng.random_range(0..10);
5221            let list_b_builder = list_a_builder.values();
5222
5223            for _ in 0..list_a_len {
5224                if rng.random_bool(0.2) {
5225                    list_b_builder.append(false);
5226                    continue;
5227                }
5228
5229                let list_b_len = rng.random_range(0..10);
5230                let int_builder = list_b_builder.values();
5231                for _ in 0..list_b_len {
5232                    match rng.random_bool(0.2) {
5233                        true => int_builder.append_null(),
5234                        false => int_builder.append_value(rng.random()),
5235                    }
5236                }
5237                list_b_builder.append(true)
5238            }
5239            list_a_builder.append(true);
5240        }
5241
5242        let array = Arc::new(list_a_builder.finish());
5243        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5244
5245        writer.write(&batch).unwrap();
5246        let _metadata = writer.close().unwrap();
5247
5248        let buf = Bytes::from(buf);
5249
5250        let cases = [
5251            vec![
5252                RowSelector::skip(100),
5253                RowSelector::select(924),
5254                RowSelector::skip(100),
5255                RowSelector::select(924),
5256            ],
5257            vec![
5258                RowSelector::select(924),
5259                RowSelector::skip(100),
5260                RowSelector::select(924),
5261                RowSelector::skip(100),
5262            ],
5263            vec![
5264                RowSelector::skip(1023),
5265                RowSelector::select(1),
5266                RowSelector::skip(1023),
5267                RowSelector::select(1),
5268            ],
5269            vec![
5270                RowSelector::select(1),
5271                RowSelector::skip(1023),
5272                RowSelector::select(1),
5273                RowSelector::skip(1023),
5274            ],
5275        ];
5276
5277        for batch_size in [100, 1024, 2048] {
5278            for selection in &cases {
5279                let selection = RowSelection::from(selection.clone());
5280                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5281                    .unwrap()
5282                    .with_row_selection(selection.clone())
5283                    .with_batch_size(batch_size)
5284                    .build()
5285                    .unwrap();
5286
5287                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5288                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5289                assert_eq!(actual.num_rows(), selection.row_count());
5290
5291                let mut batch_offset = 0;
5292                let mut actual_offset = 0;
5293                for selector in selection.iter() {
5294                    if selector.skip {
5295                        batch_offset += selector.row_count;
5296                        continue;
5297                    }
5298
5299                    assert_eq!(
5300                        batch.slice(batch_offset, selector.row_count),
5301                        actual.slice(actual_offset, selector.row_count)
5302                    );
5303
5304                    batch_offset += selector.row_count;
5305                    actual_offset += selector.row_count;
5306                }
5307            }
5308        }
5309    }
5310
5311    #[test]
5312    fn test_read_old_nested_list() {
5313        use arrow::datatypes::DataType;
5314        use arrow::datatypes::ToByteSlice;
5315
5316        let testdata = arrow::util::test_util::parquet_test_data();
5317        // message my_record {
5318        //     REQUIRED group a (LIST) {
5319        //         REPEATED group array (LIST) {
5320        //             REPEATED INT32 array;
5321        //         }
5322        //     }
5323        // }
5324        // should be read as list<list<int32>>
5325        let path = format!("{testdata}/old_list_structure.parquet");
5326        let test_file = File::open(path).unwrap();
5327
5328        // create expected ListArray
5329        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5330
5331        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5332        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5333
5334        // Construct a list array from the above two
5335        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5336            "array",
5337            DataType::Int32,
5338            false,
5339        ))))
5340        .len(2)
5341        .add_buffer(a_value_offsets)
5342        .add_child_data(a_values.into_data())
5343        .build()
5344        .unwrap();
5345        let a = ListArray::from(a_list_data);
5346
5347        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5348        let mut reader = builder.build().unwrap();
5349        let out = reader.next().unwrap().unwrap();
5350        assert_eq!(out.num_rows(), 1);
5351        assert_eq!(out.num_columns(), 1);
5352        // grab first column
5353        let c0 = out.column(0);
5354        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5355        // get first row: [[1, 2], [3, 4]]
5356        let r0 = c0arr.value(0);
5357        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5358        assert_eq!(r0arr, &a);
5359    }
5360
5361    #[test]
5362    fn test_map_no_value() {
5363        // File schema:
5364        // message schema {
5365        //   required group my_map (MAP) {
5366        //     repeated group key_value {
5367        //       required int32 key;
5368        //       optional int32 value;
5369        //     }
5370        //   }
5371        //   required group my_map_no_v (MAP) {
5372        //     repeated group key_value {
5373        //       required int32 key;
5374        //     }
5375        //   }
5376        //   required group my_list (LIST) {
5377        //     repeated group list {
5378        //       required int32 element;
5379        //     }
5380        //   }
5381        // }
5382        let testdata = arrow::util::test_util::parquet_test_data();
5383        let path = format!("{testdata}/map_no_value.parquet");
5384        let file = File::open(path).unwrap();
5385
5386        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5387            .unwrap()
5388            .build()
5389            .unwrap();
5390        let out = reader.next().unwrap().unwrap();
5391        assert_eq!(out.num_rows(), 3);
5392        assert_eq!(out.num_columns(), 3);
5393        // my_map_no_v and my_list columns should now be equivalent
5394        let c0 = out.column(1).as_list::<i32>();
5395        let c1 = out.column(2).as_list::<i32>();
5396        assert_eq!(c0.len(), c1.len());
5397        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5398    }
5399
5400    #[test]
5401    fn test_row_filter_full_page_skip_is_handled() {
5402        let first_value: i64 = 1111;
5403        let last_value: i64 = 9999;
5404        let num_rows: usize = 12;
5405
5406        // build data with row selection average length 4
5407        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
5408        // The Row Selection would be [1111, (skip 10), 9999]
5409        let schema = Arc::new(Schema::new(vec![
5410            Field::new("key", arrow_schema::DataType::Int64, false),
5411            Field::new("value", arrow_schema::DataType::Int64, false),
5412        ]));
5413
5414        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5415        int_values[0] = first_value;
5416        int_values[num_rows - 1] = last_value;
5417        let keys = Int64Array::from(int_values.clone());
5418        let values = Int64Array::from(int_values.clone());
5419        let batch = RecordBatch::try_new(
5420            Arc::clone(&schema),
5421            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5422        )
5423        .unwrap();
5424
5425        let props = WriterProperties::builder()
5426            .set_write_batch_size(2)
5427            .set_data_page_row_count_limit(2)
5428            .build();
5429
5430        let mut buffer = Vec::new();
5431        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5432        writer.write(&batch).unwrap();
5433        writer.close().unwrap();
5434        let data = Bytes::from(buffer);
5435
5436        let options = ArrowReaderOptions::new().with_page_index(true);
5437        let builder =
5438            ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5439        let schema = builder.parquet_schema().clone();
5440        let filter_mask = ProjectionMask::leaves(&schema, [0]);
5441
5442        let make_predicate = |mask: ProjectionMask| {
5443            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5444                let column = batch.column(0);
5445                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5446                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5447                or(&match_first, &match_second)
5448            })
5449        };
5450
5451        let options = ArrowReaderOptions::new().with_page_index(true);
5452        let predicate = make_predicate(filter_mask.clone());
5453
5454        // The batch size is set to 12 to read all rows in one go after filtering
5455        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
5456        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5457            .unwrap()
5458            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5459            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5460            .with_batch_size(12)
5461            .build()
5462            .unwrap();
5463
5464        // Predicate pruning used to panic once mask-backed plans removed whole pages.
5465        // Collecting into batches validates the plan now downgrades to selectors instead.
5466        let schema = reader.schema().clone();
5467        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5468        let result = concat_batches(&schema, &batches).unwrap();
5469        assert_eq!(result.num_rows(), 2);
5470    }
5471
5472    #[test]
5473    fn test_get_row_group_column_bloom_filter_with_length() {
5474        // convert to new parquet file with bloom_filter_length
5475        let testdata = arrow::util::test_util::parquet_test_data();
5476        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5477        let file = File::open(path).unwrap();
5478        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5479        let schema = builder.schema().clone();
5480        let reader = builder.build().unwrap();
5481
5482        let mut parquet_data = Vec::new();
5483        let props = WriterProperties::builder()
5484            .set_bloom_filter_enabled(true)
5485            .build();
5486        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5487        for batch in reader {
5488            let batch = batch.unwrap();
5489            writer.write(&batch).unwrap();
5490        }
5491        writer.close().unwrap();
5492
5493        // test the new parquet file
5494        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5495    }
5496
5497    #[test]
5498    fn test_get_row_group_column_bloom_filter_without_length() {
5499        let testdata = arrow::util::test_util::parquet_test_data();
5500        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5501        let data = Bytes::from(std::fs::read(path).unwrap());
5502        test_get_row_group_column_bloom_filter(data, false);
5503    }
5504
5505    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5506        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5507
5508        let metadata = builder.metadata();
5509        assert_eq!(metadata.num_row_groups(), 1);
5510        let row_group = metadata.row_group(0);
5511        let column = row_group.column(0);
5512        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5513
5514        let sbbf = builder
5515            .get_row_group_column_bloom_filter(0, 0)
5516            .unwrap()
5517            .unwrap();
5518        assert!(sbbf.check(&"Hello"));
5519        assert!(!sbbf.check(&"Hello_Not_Exists"));
5520    }
5521
5522    #[test]
5523    fn test_read_unknown_logical_type() {
5524        let testdata = arrow::util::test_util::parquet_test_data();
5525        let path = format!("{testdata}/unknown-logical-type.parquet");
5526        let test_file = File::open(path).unwrap();
5527
5528        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5529            .expect("Error creating reader builder");
5530
5531        let schema = builder.metadata().file_metadata().schema_descr();
5532        assert_eq!(
5533            schema.column(0).logical_type_ref(),
5534            Some(&LogicalType::String)
5535        );
5536        assert_eq!(
5537            schema.column(1).logical_type_ref(),
5538            Some(&LogicalType::_Unknown { field_id: 2555 })
5539        );
5540        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5541
5542        let mut reader = builder.build().unwrap();
5543        let out = reader.next().unwrap().unwrap();
5544        assert_eq!(out.num_rows(), 3);
5545        assert_eq!(out.num_columns(), 2);
5546    }
5547
5548    #[test]
5549    fn test_read_row_numbers() {
5550        let file = write_parquet_from_iter(vec![(
5551            "value",
5552            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5553        )]);
5554        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5555
5556        let row_number_field = Arc::new(
5557            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5558        );
5559
5560        let options = ArrowReaderOptions::new()
5561            .with_schema(Arc::new(Schema::new(supplied_fields)))
5562            .with_virtual_columns(vec![row_number_field.clone()])
5563            .unwrap();
5564        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5565            file.try_clone().unwrap(),
5566            options,
5567        )
5568        .expect("reader builder with schema")
5569        .build()
5570        .expect("reader with schema");
5571
5572        let batch = arrow_reader.next().unwrap().unwrap();
5573        let schema = Arc::new(Schema::new(vec![
5574            Field::new("value", ArrowDataType::Int64, false),
5575            (*row_number_field).clone(),
5576        ]));
5577
5578        assert_eq!(batch.schema(), schema);
5579        assert_eq!(batch.num_columns(), 2);
5580        assert_eq!(batch.num_rows(), 3);
5581        assert_eq!(
5582            batch
5583                .column(0)
5584                .as_primitive::<types::Int64Type>()
5585                .iter()
5586                .collect::<Vec<_>>(),
5587            vec![Some(1), Some(2), Some(3)]
5588        );
5589        assert_eq!(
5590            batch
5591                .column(1)
5592                .as_primitive::<types::Int64Type>()
5593                .iter()
5594                .collect::<Vec<_>>(),
5595            vec![Some(0), Some(1), Some(2)]
5596        );
5597    }
5598
5599    #[test]
5600    fn test_read_only_row_numbers() {
5601        let file = write_parquet_from_iter(vec![(
5602            "value",
5603            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5604        )]);
5605        let row_number_field = Arc::new(
5606            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5607        );
5608        let options = ArrowReaderOptions::new()
5609            .with_virtual_columns(vec![row_number_field.clone()])
5610            .unwrap();
5611        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5612        let num_columns = metadata
5613            .metadata
5614            .file_metadata()
5615            .schema_descr()
5616            .num_columns();
5617
5618        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5619            .with_projection(ProjectionMask::none(num_columns))
5620            .build()
5621            .expect("reader with schema");
5622
5623        let batch = arrow_reader.next().unwrap().unwrap();
5624        let schema = Arc::new(Schema::new(vec![row_number_field]));
5625
5626        assert_eq!(batch.schema(), schema);
5627        assert_eq!(batch.num_columns(), 1);
5628        assert_eq!(batch.num_rows(), 3);
5629        assert_eq!(
5630            batch
5631                .column(0)
5632                .as_primitive::<types::Int64Type>()
5633                .iter()
5634                .collect::<Vec<_>>(),
5635            vec![Some(0), Some(1), Some(2)]
5636        );
5637    }
5638
5639    #[test]
5640    fn test_read_row_numbers_row_group_order() -> Result<()> {
5641        // Make a parquet file with 100 rows split across 2 row groups
5642        let array = Int64Array::from_iter_values(5000..5100);
5643        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5644        let mut buffer = Vec::new();
5645        let options = WriterProperties::builder()
5646            .set_max_row_group_size(50)
5647            .build();
5648        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5649        // write in 10 row batches as the size limits are enforced after each batch
5650        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5651            writer.write(&batch_chunk)?;
5652        }
5653        writer.close()?;
5654
5655        let row_number_field = Arc::new(
5656            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5657        );
5658
5659        let buffer = Bytes::from(buffer);
5660
5661        let options =
5662            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5663
5664        // read out with normal options
5665        let arrow_reader =
5666            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5667                .build()?;
5668
5669        assert_eq!(
5670            ValuesAndRowNumbers {
5671                values: (5000..5100).collect(),
5672                row_numbers: (0..100).collect()
5673            },
5674            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5675        );
5676
5677        // Now read, out of order row groups
5678        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5679            .with_row_groups(vec![1, 0])
5680            .build()?;
5681
5682        assert_eq!(
5683            ValuesAndRowNumbers {
5684                values: (5050..5100).chain(5000..5050).collect(),
5685                row_numbers: (50..100).chain(0..50).collect(),
5686            },
5687            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5688        );
5689
5690        Ok(())
5691    }
5692
5693    #[derive(Debug, PartialEq)]
5694    struct ValuesAndRowNumbers {
5695        values: Vec<i64>,
5696        row_numbers: Vec<i64>,
5697    }
5698    impl ValuesAndRowNumbers {
5699        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5700            let mut values = vec![];
5701            let mut row_numbers = vec![];
5702            for batch in reader {
5703                let batch = batch.expect("Could not read batch");
5704                values.extend(
5705                    batch
5706                        .column_by_name("col")
5707                        .expect("Could not get col column")
5708                        .as_primitive::<arrow::datatypes::Int64Type>()
5709                        .iter()
5710                        .map(|v| v.expect("Could not get value")),
5711                );
5712
5713                row_numbers.extend(
5714                    batch
5715                        .column_by_name("row_number")
5716                        .expect("Could not get row_number column")
5717                        .as_primitive::<arrow::datatypes::Int64Type>()
5718                        .iter()
5719                        .map(|v| v.expect("Could not get row number"))
5720                        .collect::<Vec<_>>(),
5721                );
5722            }
5723            Self {
5724                values,
5725                row_numbers,
5726            }
5727        }
5728    }
5729
5730    #[test]
5731    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5732        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5733        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5734        assert_eq!(
5735            ArrowReaderOptions::new()
5736                .with_virtual_columns(vec![regular_field])
5737                .unwrap_err()
5738                .to_string(),
5739            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5740        );
5741    }
5742
5743    #[test]
5744    fn test_row_numbers_with_multiple_row_groups() {
5745        test_row_numbers_with_multiple_row_groups_helper(
5746            false,
5747            |path, selection, _row_filter, batch_size| {
5748                let file = File::open(path).unwrap();
5749                let row_number_field = Arc::new(
5750                    Field::new("row_number", ArrowDataType::Int64, false)
5751                        .with_extension_type(RowNumber),
5752                );
5753                let options = ArrowReaderOptions::new()
5754                    .with_virtual_columns(vec![row_number_field])
5755                    .unwrap();
5756                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5757                    .unwrap()
5758                    .with_row_selection(selection)
5759                    .with_batch_size(batch_size)
5760                    .build()
5761                    .expect("Could not create reader");
5762                reader
5763                    .collect::<Result<Vec<_>, _>>()
5764                    .expect("Could not read")
5765            },
5766        );
5767    }
5768
5769    #[test]
5770    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5771        test_row_numbers_with_multiple_row_groups_helper(
5772            true,
5773            |path, selection, row_filter, batch_size| {
5774                let file = File::open(path).unwrap();
5775                let row_number_field = Arc::new(
5776                    Field::new("row_number", ArrowDataType::Int64, false)
5777                        .with_extension_type(RowNumber),
5778                );
5779                let options = ArrowReaderOptions::new()
5780                    .with_virtual_columns(vec![row_number_field])
5781                    .unwrap();
5782                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5783                    .unwrap()
5784                    .with_row_selection(selection)
5785                    .with_batch_size(batch_size)
5786                    .with_row_filter(row_filter.expect("No filter"))
5787                    .build()
5788                    .expect("Could not create reader");
5789                reader
5790                    .collect::<Result<Vec<_>, _>>()
5791                    .expect("Could not read")
5792            },
5793        );
5794    }
5795
5796    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5797        use_filter: bool,
5798        test_case: F,
5799    ) where
5800        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5801    {
5802        let seed: u64 = random();
5803        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5804        let mut rng = StdRng::seed_from_u64(seed);
5805
5806        use tempfile::TempDir;
5807        let tempdir = TempDir::new().expect("Could not create temp dir");
5808
5809        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5810
5811        let path = tempdir.path().join("test.parquet");
5812        std::fs::write(&path, bytes).expect("Could not write file");
5813
5814        let mut case = vec![];
5815        let mut remaining = metadata.file_metadata().num_rows();
5816        while remaining > 0 {
5817            let row_count = rng.random_range(1..=remaining);
5818            remaining -= row_count;
5819            case.push(RowSelector {
5820                row_count: row_count as usize,
5821                skip: rng.random_bool(0.5),
5822            });
5823        }
5824
5825        let filter = use_filter.then(|| {
5826            let filter = (0..metadata.file_metadata().num_rows())
5827                .map(|_| rng.random_bool(0.99))
5828                .collect::<Vec<_>>();
5829            let mut filter_offset = 0;
5830            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5831                ProjectionMask::all(),
5832                move |b| {
5833                    let array = BooleanArray::from_iter(
5834                        filter
5835                            .iter()
5836                            .skip(filter_offset)
5837                            .take(b.num_rows())
5838                            .map(|x| Some(*x)),
5839                    );
5840                    filter_offset += b.num_rows();
5841                    Ok(array)
5842                },
5843            ))])
5844        });
5845
5846        let selection = RowSelection::from(case);
5847        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5848
5849        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5850            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5851            return;
5852        }
5853        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5854            .expect("Failed to concatenate");
5855        // assert_eq!(selection.row_count(), actual.num_rows());
5856        let values = actual
5857            .column(0)
5858            .as_primitive::<types::Int64Type>()
5859            .iter()
5860            .collect::<Vec<_>>();
5861        let row_numbers = actual
5862            .column(1)
5863            .as_primitive::<types::Int64Type>()
5864            .iter()
5865            .collect::<Vec<_>>();
5866        assert_eq!(
5867            row_numbers
5868                .into_iter()
5869                .map(|number| number.map(|number| number + 1))
5870                .collect::<Vec<_>>(),
5871            values
5872        );
5873    }
5874
5875    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5876        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5877            "value",
5878            ArrowDataType::Int64,
5879            false,
5880        )])));
5881
5882        let mut buf = Vec::with_capacity(1024);
5883        let mut writer =
5884            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5885
5886        let mut values = 1..=rng.random_range(1..4096);
5887        while !values.is_empty() {
5888            let batch_values = values
5889                .by_ref()
5890                .take(rng.random_range(1..4096))
5891                .collect::<Vec<_>>();
5892            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5893            let batch =
5894                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5895            writer.write(&batch).expect("Could not write batch");
5896            writer.flush().expect("Could not flush");
5897        }
5898        let metadata = writer.close().expect("Could not close writer");
5899
5900        (Bytes::from(buf), metadata)
5901    }
5902}