parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::bloom_filter::{
34    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
35};
36use crate::column::page::{PageIterator, PageReader};
37#[cfg(feature = "encryption")]
38use crate::encryption::decrypt::FileDecryptionProperties;
39use crate::errors::{ParquetError, Result};
40use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
41use crate::file::reader::{ChunkReader, SerializedPageReader};
42use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54/// Builder for constructing Parquet readers that decode into [Apache Arrow]
55/// arrays.
56///
57/// Most users should use one of the following specializations:
58///
59/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
60/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61///
62/// # Features
63/// * Projection pushdown: [`Self::with_projection`]
64/// * Cached metadata: [`ArrowReaderMetadata::load`]
65/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
66/// * Row group filtering: [`Self::with_row_groups`]
67/// * Range filtering: [`Self::with_row_selection`]
68/// * Row level filtering: [`Self::with_row_filter`]
69///
70/// # Implementing Predicate Pushdown
71///
72/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
73/// process, which is efficient and allows the most low level optimizations.
74///
75/// However, most Parquet based systems will apply filters at many steps prior
76/// to decoding such as pruning files, row groups and data pages. This crate
77/// provides the low level APIs needed to implement such filtering, but does not
78/// include any logic to actually evaluate predicates. For example:
79///
80/// * [`Self::with_row_groups`] for Row Group pruning
81/// * [`Self::with_row_selection`] for data page pruning
82/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
83///
84/// The rationale for this design is that implementing predicate pushdown is a
85/// complex topic and varies significantly from system to system. For example
86///
87/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
88/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
89/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
90/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
91///
92/// You can read more about this design in the [Querying Parquet with
93/// Millisecond Latency] Arrow blog post.
94///
95/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
96/// [Apache Arrow]: https://arrow.apache.org/
97/// [`StatisticsConverter`]: statistics::StatisticsConverter
98/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
99pub struct ArrowReaderBuilder<T> {
100    pub(crate) input: T,
101
102    pub(crate) metadata: Arc<ParquetMetaData>,
103
104    pub(crate) schema: SchemaRef,
105
106    pub(crate) fields: Option<Arc<ParquetField>>,
107
108    pub(crate) batch_size: usize,
109
110    pub(crate) row_groups: Option<Vec<usize>>,
111
112    pub(crate) projection: ProjectionMask,
113
114    pub(crate) filter: Option<RowFilter>,
115
116    pub(crate) selection: Option<RowSelection>,
117
118    pub(crate) limit: Option<usize>,
119
120    pub(crate) offset: Option<usize>,
121
122    pub(crate) metrics: ArrowReaderMetrics,
123
124    pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("ArrowReaderBuilder<T>")
130            .field("input", &self.input)
131            .field("metadata", &self.metadata)
132            .field("schema", &self.schema)
133            .field("fields", &self.fields)
134            .field("batch_size", &self.batch_size)
135            .field("row_groups", &self.row_groups)
136            .field("projection", &self.projection)
137            .field("filter", &self.filter)
138            .field("selection", &self.selection)
139            .field("limit", &self.limit)
140            .field("offset", &self.offset)
141            .field("metrics", &self.metrics)
142            .finish()
143    }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148        Self {
149            input,
150            metadata: metadata.metadata,
151            schema: metadata.schema,
152            fields: metadata.fields,
153            batch_size: 1024,
154            row_groups: None,
155            projection: ProjectionMask::all(),
156            filter: None,
157            selection: None,
158            limit: None,
159            offset: None,
160            metrics: ArrowReaderMetrics::Disabled,
161            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
162        }
163    }
164
165    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
166    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167        &self.metadata
168    }
169
170    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
171    pub fn parquet_schema(&self) -> &SchemaDescriptor {
172        self.metadata.file_metadata().schema_descr()
173    }
174
175    /// Returns the arrow [`SchemaRef`] for this parquet file
176    pub fn schema(&self) -> &SchemaRef {
177        &self.schema
178    }
179
180    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
181    /// If the batch_size more than the file row count, use the file row count.
182    pub fn with_batch_size(self, batch_size: usize) -> Self {
183        // Try to avoid allocate large buffer
184        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185        Self { batch_size, ..self }
186    }
187
188    /// Only read data from the provided row group indexes
189    ///
190    /// This is also called row group filtering
191    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192        Self {
193            row_groups: Some(row_groups),
194            ..self
195        }
196    }
197
198    /// Only read data from the provided column indexes
199    pub fn with_projection(self, mask: ProjectionMask) -> Self {
200        Self {
201            projection: mask,
202            ..self
203        }
204    }
205
206    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
207    /// data into memory.
208    ///
209    /// This feature is used to restrict which rows are decoded within row
210    /// groups, skipping ranges of rows that are not needed. Such selections
211    /// could be determined by evaluating predicates against the parquet page
212    /// [`Index`] or some other external information available to a query
213    /// engine.
214    ///
215    /// # Notes
216    ///
217    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
218    /// applying the row selection, and therefore rows from skipped row groups
219    /// should not be included in the [`RowSelection`] (see example below)
220    ///
221    /// It is recommended to enable writing the page index if using this
222    /// functionality, to allow more efficient skipping over data pages. See
223    /// [`ArrowReaderOptions::with_page_index`].
224    ///
225    /// # Example
226    ///
227    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
228    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
229    /// row group 3:
230    ///
231    /// ```text
232    ///   Row Group 0, 1000 rows (selected)
233    ///   Row Group 1, 1000 rows (skipped)
234    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
235    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
236    /// ```
237    ///
238    /// You could pass the following [`RowSelection`]:
239    ///
240    /// ```text
241    ///  Select 1000    (scan all rows in row group 0)
242    ///  Skip 50        (skip the first 50 rows in row group 2)
243    ///  Select 50      (scan rows 50-100 in row group 2)
244    ///  Skip 900       (skip the remaining rows in row group 2)
245    ///  Skip 200       (skip the first 200 rows in row group 3)
246    ///  Select 100     (scan rows 200-300 in row group 3)
247    ///  Skip 700       (skip the remaining rows in row group 3)
248    /// ```
249    /// Note there is no entry for the (entirely) skipped row group 1.
250    ///
251    /// Note you can represent the same selection with fewer entries. Instead of
252    ///
253    /// ```text
254    ///  Skip 900       (skip the remaining rows in row group 2)
255    ///  Skip 200       (skip the first 200 rows in row group 3)
256    /// ```
257    ///
258    /// you could use
259    ///
260    /// ```text
261    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
262    /// ```
263    ///
264    /// [`Index`]: crate::file::page_index::index::Index
265    pub fn with_row_selection(self, selection: RowSelection) -> Self {
266        Self {
267            selection: Some(selection),
268            ..self
269        }
270    }
271
272    /// Provide a [`RowFilter`] to skip decoding rows
273    ///
274    /// Row filters are applied after row group selection and row selection
275    ///
276    /// It is recommended to enable reading the page index if using this functionality, to allow
277    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
278    pub fn with_row_filter(self, filter: RowFilter) -> Self {
279        Self {
280            filter: Some(filter),
281            ..self
282        }
283    }
284
285    /// Provide a limit to the number of rows to be read
286    ///
287    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
288    /// allowing it to limit the final set of rows decoded after any pushed down predicates
289    ///
290    /// It is recommended to enable reading the page index if using this functionality, to allow
291    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
292    pub fn with_limit(self, limit: usize) -> Self {
293        Self {
294            limit: Some(limit),
295            ..self
296        }
297    }
298
299    /// Provide an offset to skip over the given number of rows
300    ///
301    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
302    /// allowing it to skip rows after any pushed down predicates
303    ///
304    /// It is recommended to enable reading the page index if using this functionality, to allow
305    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
306    pub fn with_offset(self, offset: usize) -> Self {
307        Self {
308            offset: Some(offset),
309            ..self
310        }
311    }
312
313    /// Specify metrics collection during reading
314    ///
315    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
316    /// clone of the provided metrics to the builder.
317    ///
318    /// For example:
319    ///
320    /// ```rust
321    /// # use std::sync::Arc;
322    /// # use bytes::Bytes;
323    /// # use arrow_array::{Int32Array, RecordBatch};
324    /// # use arrow_schema::{DataType, Field, Schema};
325    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
326    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
327    /// # use parquet::arrow::ArrowWriter;
328    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
329    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
330    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
331    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
332    /// # writer.write(&batch).unwrap();
333    /// # writer.close().unwrap();
334    /// # let file = Bytes::from(file);
335    /// // Create metrics object to pass into the reader
336    /// let metrics = ArrowReaderMetrics::enabled();
337    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
338    ///   // Configure the builder to use the metrics by passing a clone
339    ///   .with_metrics(metrics.clone())
340    ///   // Build the reader
341    ///   .build().unwrap();
342    /// // .. read data from the reader ..
343    ///
344    /// // check the metrics
345    /// assert!(metrics.records_read_from_inner().is_some());
346    /// ```
347    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348        Self { metrics, ..self }
349    }
350
351    /// Set the maximum size (per row group) of the predicate cache in bytes for
352    /// the async decoder.
353    ///
354    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
355    /// unlimited cache size.
356    ///
357    /// This cache is used to store decoded arrays that are used in
358    /// predicate evaluation ([`Self::with_row_filter`]).
359    ///
360    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
361    /// [this ticket] for more details and alternatives.
362    ///
363    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
364    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
365    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366        Self {
367            max_predicate_cache_size,
368            ..self
369        }
370    }
371}
372
373/// Options that control how metadata is read for a parquet file
374///
375/// See [`ArrowReaderBuilder`] for how to configure how the column data
376/// is then read from the file, including projection and filter pushdown
377#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379    /// Should the reader strip any user defined metadata from the Arrow schema
380    skip_arrow_metadata: bool,
381    /// If provided used as the schema hint when determining the Arrow schema,
382    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
383    ///
384    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
385    supplied_schema: Option<SchemaRef>,
386    /// Policy for reading offset and column indexes.
387    pub(crate) page_index_policy: PageIndexPolicy,
388    /// If encryption is enabled, the file decryption properties can be provided
389    #[cfg(feature = "encryption")]
390    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394    /// Create a new [`ArrowReaderOptions`] with the default settings
395    pub fn new() -> Self {
396        Self::default()
397    }
398
399    /// Skip decoding the embedded arrow metadata (defaults to `false`)
400    ///
401    /// Parquet files generated by some writers may contain embedded arrow
402    /// schema and metadata.
403    /// This may not be correct or compatible with your system,
404    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
405    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406        Self {
407            skip_arrow_metadata,
408            ..self
409        }
410    }
411
412    /// Provide a schema hint to use when reading the Parquet file.
413    ///
414    /// If provided, this schema takes precedence over any arrow schema embedded
415    /// in the metadata (see the [`arrow`] documentation for more details).
416    ///
417    /// If the provided schema is not compatible with the data stored in the
418    /// parquet file schema, an error will be returned when constructing the
419    /// builder.
420    ///
421    /// This option is only required if you want to explicitly control the
422    /// conversion of Parquet types to Arrow types, such as casting a column to
423    /// a different type. For example, if you wanted to read an Int64 in
424    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
425    ///
426    /// [`arrow`]: crate::arrow
427    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
428    ///
429    /// # Notes
430    ///
431    /// The provided schema must have the same number of columns as the parquet schema and
432    /// the column names must be the same.
433    ///
434    /// # Example
435    /// ```
436    /// use std::io::Bytes;
437    /// use std::sync::Arc;
438    /// use tempfile::tempfile;
439    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
440    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
441    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
442    /// use parquet::arrow::ArrowWriter;
443    ///
444    /// // Write data - schema is inferred from the data to be Int32
445    /// let file = tempfile().unwrap();
446    /// let batch = RecordBatch::try_from_iter(vec![
447    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
448    /// ]).unwrap();
449    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
450    /// writer.write(&batch).unwrap();
451    /// writer.close().unwrap();
452    ///
453    /// // Read the file back.
454    /// // Supply a schema that interprets the Int32 column as a Timestamp.
455    /// let supplied_schema = Arc::new(Schema::new(vec![
456    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
457    /// ]));
458    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
459    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
460    ///     file.try_clone().unwrap(),
461    ///     options
462    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
463    ///
464    /// // Create the reader and read the data using the supplied schema.
465    /// let mut reader = builder.build().unwrap();
466    /// let _batch = reader.next().unwrap().unwrap();
467    /// ```
468    pub fn with_schema(self, schema: SchemaRef) -> Self {
469        Self {
470            supplied_schema: Some(schema),
471            skip_arrow_metadata: true,
472            ..self
473        }
474    }
475
476    /// Enable reading [`PageIndex`], if present (defaults to `false`)
477    ///
478    /// The `PageIndex` can be used to push down predicates to the parquet scan,
479    /// potentially eliminating unnecessary IO, by some query engines.
480    ///
481    /// If this is enabled, [`ParquetMetaData::column_index`] and
482    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
483    /// information is present in the file.
484    ///
485    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
486    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
487    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
488    pub fn with_page_index(self, page_index: bool) -> Self {
489        let page_index_policy = PageIndexPolicy::from(page_index);
490
491        Self {
492            page_index_policy,
493            ..self
494        }
495    }
496
497    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
498    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
499        Self {
500            page_index_policy: policy,
501            ..self
502        }
503    }
504
505    /// Provide the file decryption properties to use when reading encrypted parquet files.
506    ///
507    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
508    #[cfg(feature = "encryption")]
509    pub fn with_file_decryption_properties(
510        self,
511        file_decryption_properties: FileDecryptionProperties,
512    ) -> Self {
513        Self {
514            file_decryption_properties: Some(file_decryption_properties),
515            ..self
516        }
517    }
518
519    /// Retrieve the currently set page index behavior.
520    ///
521    /// This can be set via [`with_page_index`][Self::with_page_index].
522    pub fn page_index(&self) -> bool {
523        self.page_index_policy != PageIndexPolicy::Skip
524    }
525
526    /// Retrieve the currently set file decryption properties.
527    ///
528    /// This can be set via
529    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
530    #[cfg(feature = "encryption")]
531    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
532        self.file_decryption_properties.as_ref()
533    }
534}
535
536/// The metadata necessary to construct a [`ArrowReaderBuilder`]
537///
538/// Note this structure is cheaply clone-able as it consists of several arcs.
539///
540/// This structure allows
541///
542/// 1. Loading metadata for a file once and then using that same metadata to
543///    construct multiple separate readers, for example, to distribute readers
544///    across multiple threads
545///
546/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
547///    from the file each time a reader is constructed.
548///
549/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
550#[derive(Debug, Clone)]
551pub struct ArrowReaderMetadata {
552    /// The Parquet Metadata, if known aprior
553    pub(crate) metadata: Arc<ParquetMetaData>,
554    /// The Arrow Schema
555    pub(crate) schema: SchemaRef,
556
557    pub(crate) fields: Option<Arc<ParquetField>>,
558}
559
560impl ArrowReaderMetadata {
561    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
562    ///
563    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
564    /// example of how this can be used
565    ///
566    /// # Notes
567    ///
568    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
569    /// `Self::metadata` is missing the page index, this function will attempt
570    /// to load the page index by making an object store request.
571    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
572        let metadata =
573            ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
574        #[cfg(feature = "encryption")]
575        let metadata =
576            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
577        let metadata = metadata.parse_and_finish(reader)?;
578        Self::try_new(Arc::new(metadata), options)
579    }
580
581    /// Create a new [`ArrowReaderMetadata`]
582    ///
583    /// # Notes
584    ///
585    /// This function does not attempt to load the PageIndex if not present in the metadata.
586    /// See [`Self::load`] for more details.
587    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
588        match options.supplied_schema {
589            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
590            None => {
591                let kv_metadata = match options.skip_arrow_metadata {
592                    true => None,
593                    false => metadata.file_metadata().key_value_metadata(),
594                };
595
596                let (schema, fields) = parquet_to_arrow_schema_and_fields(
597                    metadata.file_metadata().schema_descr(),
598                    ProjectionMask::all(),
599                    kv_metadata,
600                )?;
601
602                Ok(Self {
603                    metadata,
604                    schema: Arc::new(schema),
605                    fields: fields.map(Arc::new),
606                })
607            }
608        }
609    }
610
611    fn with_supplied_schema(
612        metadata: Arc<ParquetMetaData>,
613        supplied_schema: SchemaRef,
614    ) -> Result<Self> {
615        let parquet_schema = metadata.file_metadata().schema_descr();
616        let field_levels = parquet_to_arrow_field_levels(
617            parquet_schema,
618            ProjectionMask::all(),
619            Some(supplied_schema.fields()),
620        )?;
621        let fields = field_levels.fields;
622        let inferred_len = fields.len();
623        let supplied_len = supplied_schema.fields().len();
624        // Ensure the supplied schema has the same number of columns as the parquet schema.
625        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
626        // different lengths, but we check here to be safe.
627        if inferred_len != supplied_len {
628            return Err(arrow_err!(format!(
629                "Incompatible supplied Arrow schema: expected {} columns received {}",
630                inferred_len, supplied_len
631            )));
632        }
633
634        let mut errors = Vec::new();
635
636        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
637
638        for (field1, field2) in field_iter {
639            if field1.data_type() != field2.data_type() {
640                errors.push(format!(
641                    "data type mismatch for field {}: requested {:?} but found {:?}",
642                    field1.name(),
643                    field1.data_type(),
644                    field2.data_type()
645                ));
646            }
647            if field1.is_nullable() != field2.is_nullable() {
648                errors.push(format!(
649                    "nullability mismatch for field {}: expected {:?} but found {:?}",
650                    field1.name(),
651                    field1.is_nullable(),
652                    field2.is_nullable()
653                ));
654            }
655            if field1.metadata() != field2.metadata() {
656                errors.push(format!(
657                    "metadata mismatch for field {}: expected {:?} but found {:?}",
658                    field1.name(),
659                    field1.metadata(),
660                    field2.metadata()
661                ));
662            }
663        }
664
665        if !errors.is_empty() {
666            let message = errors.join(", ");
667            return Err(ParquetError::ArrowError(format!(
668                "Incompatible supplied Arrow schema: {message}",
669            )));
670        }
671
672        Ok(Self {
673            metadata,
674            schema: supplied_schema,
675            fields: field_levels.levels.map(Arc::new),
676        })
677    }
678
679    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
680    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
681        &self.metadata
682    }
683
684    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
685    pub fn parquet_schema(&self) -> &SchemaDescriptor {
686        self.metadata.file_metadata().schema_descr()
687    }
688
689    /// Returns the arrow [`SchemaRef`] for this parquet file
690    pub fn schema(&self) -> &SchemaRef {
691        &self.schema
692    }
693}
694
695#[doc(hidden)]
696/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
697pub struct SyncReader<T: ChunkReader>(T);
698
699impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
700    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701        f.debug_tuple("SyncReader").field(&self.0).finish()
702    }
703}
704
705/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
706///
707/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
708///
709/// See [`ArrowReaderBuilder`] for additional member functions
710pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
711
712impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
713    /// Create a new [`ParquetRecordBatchReaderBuilder`]
714    ///
715    /// ```
716    /// # use std::sync::Arc;
717    /// # use bytes::Bytes;
718    /// # use arrow_array::{Int32Array, RecordBatch};
719    /// # use arrow_schema::{DataType, Field, Schema};
720    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
721    /// # use parquet::arrow::ArrowWriter;
722    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
723    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
724    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
725    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
726    /// # writer.write(&batch).unwrap();
727    /// # writer.close().unwrap();
728    /// # let file = Bytes::from(file);
729    /// #
730    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
731    ///
732    /// // Inspect metadata
733    /// assert_eq!(builder.metadata().num_row_groups(), 1);
734    ///
735    /// // Construct reader
736    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
737    ///
738    /// // Read data
739    /// let _batch = reader.next().unwrap().unwrap();
740    /// ```
741    pub fn try_new(reader: T) -> Result<Self> {
742        Self::try_new_with_options(reader, Default::default())
743    }
744
745    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
746    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
747        let metadata = ArrowReaderMetadata::load(&reader, options)?;
748        Ok(Self::new_with_metadata(reader, metadata))
749    }
750
751    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
752    ///
753    /// This interface allows:
754    ///
755    /// 1. Loading metadata once and using it to create multiple builders with
756    ///    potentially different settings or run on different threads
757    ///
758    /// 2. Using a cached copy of the metadata rather than re-reading it from the
759    ///    file each time a reader is constructed.
760    ///
761    /// See the docs on [`ArrowReaderMetadata`] for more details
762    ///
763    /// # Example
764    /// ```
765    /// # use std::fs::metadata;
766    /// # use std::sync::Arc;
767    /// # use bytes::Bytes;
768    /// # use arrow_array::{Int32Array, RecordBatch};
769    /// # use arrow_schema::{DataType, Field, Schema};
770    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
771    /// # use parquet::arrow::ArrowWriter;
772    /// #
773    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
774    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
775    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
776    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
777    /// # writer.write(&batch).unwrap();
778    /// # writer.close().unwrap();
779    /// # let file = Bytes::from(file);
780    /// #
781    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
782    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
783    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
784    ///
785    /// // Should be able to read from both in parallel
786    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
787    /// ```
788    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
789        Self::new_builder(SyncReader(input), metadata)
790    }
791
792    /// Read bloom filter for a column in a row group
793    ///
794    /// Returns `None` if the column does not have a bloom filter
795    ///
796    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
797    pub fn get_row_group_column_bloom_filter(
798        &mut self,
799        row_group_idx: usize,
800        column_idx: usize,
801    ) -> Result<Option<Sbbf>> {
802        let metadata = self.metadata.row_group(row_group_idx);
803        let column_metadata = metadata.column(column_idx);
804
805        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
806            offset
807                .try_into()
808                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
809        } else {
810            return Ok(None);
811        };
812
813        let buffer = match column_metadata.bloom_filter_length() {
814            Some(length) => self.input.0.get_bytes(offset, length as usize),
815            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
816        }?;
817
818        let (header, bitset_offset) =
819            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
820
821        match header.algorithm {
822            BloomFilterAlgorithm::BLOCK(_) => {
823                // this match exists to future proof the singleton algorithm enum
824            }
825        }
826        match header.compression {
827            BloomFilterCompression::UNCOMPRESSED(_) => {
828                // this match exists to future proof the singleton compression enum
829            }
830        }
831        match header.hash {
832            BloomFilterHash::XXHASH(_) => {
833                // this match exists to future proof the singleton hash enum
834            }
835        }
836
837        let bitset = match column_metadata.bloom_filter_length() {
838            Some(_) => buffer.slice(
839                (TryInto::<usize>::try_into(bitset_offset).unwrap()
840                    - TryInto::<usize>::try_into(offset).unwrap())..,
841            ),
842            None => {
843                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
844                    ParquetError::General("Bloom filter length is invalid".to_string())
845                })?;
846                self.input.0.get_bytes(bitset_offset, bitset_length)?
847            }
848        };
849        Ok(Some(Sbbf::new(&bitset)))
850    }
851
852    /// Build a [`ParquetRecordBatchReader`]
853    ///
854    /// Note: this will eagerly evaluate any `RowFilter` before returning
855    pub fn build(self) -> Result<ParquetRecordBatchReader> {
856        let Self {
857            input,
858            metadata,
859            schema: _,
860            fields,
861            batch_size: _,
862            row_groups,
863            projection,
864            mut filter,
865            selection,
866            limit,
867            offset,
868            metrics,
869            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
870            max_predicate_cache_size: _,
871        } = self;
872
873        // Try to avoid allocate large buffer
874        let batch_size = self
875            .batch_size
876            .min(metadata.file_metadata().num_rows() as usize);
877
878        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
879
880        let reader = ReaderRowGroups {
881            reader: Arc::new(input.0),
882            metadata,
883            row_groups,
884        };
885
886        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
887
888        // Update selection based on any filters
889        if let Some(filter) = filter.as_mut() {
890            for predicate in filter.predicates.iter_mut() {
891                // break early if we have ruled out all rows
892                if !plan_builder.selects_any() {
893                    break;
894                }
895
896                let mut cache_projection = predicate.projection().clone();
897                cache_projection.intersect(&projection);
898
899                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
900                    .build_array_reader(fields.as_deref(), predicate.projection())?;
901
902                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
903            }
904        }
905
906        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
907            .build_array_reader(fields.as_deref(), &projection)?;
908
909        let read_plan = plan_builder
910            .limited(reader.num_rows())
911            .with_offset(offset)
912            .with_limit(limit)
913            .build_limited()
914            .build();
915
916        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
917    }
918}
919
920struct ReaderRowGroups<T: ChunkReader> {
921    reader: Arc<T>,
922
923    metadata: Arc<ParquetMetaData>,
924    /// Optional list of row group indices to scan
925    row_groups: Vec<usize>,
926}
927
928impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
929    fn num_rows(&self) -> usize {
930        let meta = self.metadata.row_groups();
931        self.row_groups
932            .iter()
933            .map(|x| meta[*x].num_rows() as usize)
934            .sum()
935    }
936
937    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
938        Ok(Box::new(ReaderPageIterator {
939            column_idx: i,
940            reader: self.reader.clone(),
941            metadata: self.metadata.clone(),
942            row_groups: self.row_groups.clone().into_iter(),
943        }))
944    }
945}
946
947struct ReaderPageIterator<T: ChunkReader> {
948    reader: Arc<T>,
949    column_idx: usize,
950    row_groups: std::vec::IntoIter<usize>,
951    metadata: Arc<ParquetMetaData>,
952}
953
954impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
955    /// Return the next SerializedPageReader
956    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
957        let rg = self.metadata.row_group(rg_idx);
958        let column_chunk_metadata = rg.column(self.column_idx);
959        let offset_index = self.metadata.offset_index();
960        // `offset_index` may not exist and `i[rg_idx]` will be empty.
961        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
962        let page_locations = offset_index
963            .filter(|i| !i[rg_idx].is_empty())
964            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
965        let total_rows = rg.num_rows() as usize;
966        let reader = self.reader.clone();
967
968        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
969            .add_crypto_context(
970                rg_idx,
971                self.column_idx,
972                self.metadata.as_ref(),
973                column_chunk_metadata,
974            )
975    }
976}
977
978impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
979    type Item = Result<Box<dyn PageReader>>;
980
981    fn next(&mut self) -> Option<Self::Item> {
982        let rg_idx = self.row_groups.next()?;
983        let page_reader = self
984            .next_page_reader(rg_idx)
985            .map(|page_reader| Box::new(page_reader) as _);
986        Some(page_reader)
987    }
988}
989
990impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
991
992/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
993/// read from a parquet data source
994pub struct ParquetRecordBatchReader {
995    array_reader: Box<dyn ArrayReader>,
996    schema: SchemaRef,
997    read_plan: ReadPlan,
998}
999
1000impl Iterator for ParquetRecordBatchReader {
1001    type Item = Result<RecordBatch, ArrowError>;
1002
1003    fn next(&mut self) -> Option<Self::Item> {
1004        self.next_inner()
1005            .map_err(|arrow_err| arrow_err.into())
1006            .transpose()
1007    }
1008}
1009
1010impl ParquetRecordBatchReader {
1011    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1012    /// has reached the end of the file.
1013    ///
1014    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1015    /// simplify error handling with `?`
1016    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1017        let mut read_records = 0;
1018        let batch_size = self.batch_size();
1019        match self.read_plan.selection_mut() {
1020            Some(selection) => {
1021                while read_records < batch_size && !selection.is_empty() {
1022                    let front = selection.pop_front().unwrap();
1023                    if front.skip {
1024                        let skipped = self.array_reader.skip_records(front.row_count)?;
1025
1026                        if skipped != front.row_count {
1027                            return Err(general_err!(
1028                                "failed to skip rows, expected {}, got {}",
1029                                front.row_count,
1030                                skipped
1031                            ));
1032                        }
1033                        continue;
1034                    }
1035
1036                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1037                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1038                    if front.row_count == 0 {
1039                        continue;
1040                    }
1041
1042                    // try to read record
1043                    let need_read = batch_size - read_records;
1044                    let to_read = match front.row_count.checked_sub(need_read) {
1045                        Some(remaining) if remaining != 0 => {
1046                            // if page row count less than batch_size we must set batch size to page row count.
1047                            // add check avoid dead loop
1048                            selection.push_front(RowSelector::select(remaining));
1049                            need_read
1050                        }
1051                        _ => front.row_count,
1052                    };
1053                    match self.array_reader.read_records(to_read)? {
1054                        0 => break,
1055                        rec => read_records += rec,
1056                    };
1057                }
1058            }
1059            None => {
1060                self.array_reader.read_records(batch_size)?;
1061            }
1062        };
1063
1064        let array = self.array_reader.consume_batch()?;
1065        let struct_array = array.as_struct_opt().ok_or_else(|| {
1066            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1067        })?;
1068
1069        Ok(if struct_array.len() > 0 {
1070            Some(RecordBatch::from(struct_array))
1071        } else {
1072            None
1073        })
1074    }
1075}
1076
1077impl RecordBatchReader for ParquetRecordBatchReader {
1078    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1079    ///
1080    /// Note that the schema metadata will be stripped here. See
1081    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1082    fn schema(&self) -> SchemaRef {
1083        self.schema.clone()
1084    }
1085}
1086
1087impl ParquetRecordBatchReader {
1088    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1089    ///
1090    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1091    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1092        ParquetRecordBatchReaderBuilder::try_new(reader)?
1093            .with_batch_size(batch_size)
1094            .build()
1095    }
1096
1097    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1098    ///
1099    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1100    /// higher-level interface for reading parquet data from a file
1101    pub fn try_new_with_row_groups(
1102        levels: &FieldLevels,
1103        row_groups: &dyn RowGroups,
1104        batch_size: usize,
1105        selection: Option<RowSelection>,
1106    ) -> Result<Self> {
1107        // note metrics are not supported in this API
1108        let metrics = ArrowReaderMetrics::disabled();
1109        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1110            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1111
1112        let read_plan = ReadPlanBuilder::new(batch_size)
1113            .with_selection(selection)
1114            .build();
1115
1116        Ok(Self {
1117            array_reader,
1118            schema: Arc::new(Schema::new(levels.fields.clone())),
1119            read_plan,
1120        })
1121    }
1122
1123    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1124    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1125    /// all rows will be returned
1126    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1127        let schema = match array_reader.get_data_type() {
1128            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
1129            _ => unreachable!("Struct array reader's data type is not struct!"),
1130        };
1131
1132        Self {
1133            array_reader,
1134            schema: Arc::new(schema),
1135            read_plan,
1136        }
1137    }
1138
1139    #[inline(always)]
1140    pub(crate) fn batch_size(&self) -> usize {
1141        self.read_plan.batch_size()
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use std::cmp::min;
1148    use std::collections::{HashMap, VecDeque};
1149    use std::fmt::Formatter;
1150    use std::fs::File;
1151    use std::io::Seek;
1152    use std::path::PathBuf;
1153    use std::sync::Arc;
1154
1155    use arrow_array::builder::*;
1156    use arrow_array::cast::AsArray;
1157    use arrow_array::types::{
1158        Date32Type, Date64Type, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
1159        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1160        Time64MicrosecondType,
1161    };
1162    use arrow_array::*;
1163    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1164    use arrow_data::{ArrayData, ArrayDataBuilder};
1165    use arrow_schema::{
1166        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1167    };
1168    use arrow_select::concat::concat_batches;
1169    use bytes::Bytes;
1170    use half::f16;
1171    use num::PrimInt;
1172    use rand::{rng, Rng, RngCore};
1173    use tempfile::tempfile;
1174
1175    use crate::arrow::arrow_reader::{
1176        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1177        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1178    };
1179    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1180    use crate::arrow::{ArrowWriter, ProjectionMask};
1181    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1182    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1183    use crate::data_type::{
1184        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1185        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1186    };
1187    use crate::errors::Result;
1188    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1189    use crate::file::writer::SerializedFileWriter;
1190    use crate::schema::parser::parse_message_type;
1191    use crate::schema::types::{Type, TypePtr};
1192    use crate::util::test_common::rand_gen::RandGen;
1193
1194    #[test]
1195    fn test_arrow_reader_all_columns() {
1196        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1197
1198        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1199        let original_schema = Arc::clone(builder.schema());
1200        let reader = builder.build().unwrap();
1201
1202        // Verify that the schema was correctly parsed
1203        assert_eq!(original_schema.fields(), reader.schema().fields());
1204    }
1205
1206    #[test]
1207    fn test_arrow_reader_single_column() {
1208        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1209
1210        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1211        let original_schema = Arc::clone(builder.schema());
1212
1213        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1214        let reader = builder.with_projection(mask).build().unwrap();
1215
1216        // Verify that the schema was correctly parsed
1217        assert_eq!(1, reader.schema().fields().len());
1218        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1219    }
1220
1221    #[test]
1222    fn test_arrow_reader_single_column_by_name() {
1223        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1224
1225        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1226        let original_schema = Arc::clone(builder.schema());
1227
1228        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1229        let reader = builder.with_projection(mask).build().unwrap();
1230
1231        // Verify that the schema was correctly parsed
1232        assert_eq!(1, reader.schema().fields().len());
1233        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1234    }
1235
1236    #[test]
1237    fn test_null_column_reader_test() {
1238        let mut file = tempfile::tempfile().unwrap();
1239
1240        let schema = "
1241            message message {
1242                OPTIONAL INT32 int32;
1243            }
1244        ";
1245        let schema = Arc::new(parse_message_type(schema).unwrap());
1246
1247        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1248        generate_single_column_file_with_data::<Int32Type>(
1249            &[vec![], vec![]],
1250            Some(&def_levels),
1251            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1252            schema,
1253            Some(Field::new("int32", ArrowDataType::Null, true)),
1254            &Default::default(),
1255        )
1256        .unwrap();
1257
1258        file.rewind().unwrap();
1259
1260        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1261        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1262
1263        assert_eq!(batches.len(), 4);
1264        for batch in &batches[0..3] {
1265            assert_eq!(batch.num_rows(), 2);
1266            assert_eq!(batch.num_columns(), 1);
1267            assert_eq!(batch.column(0).null_count(), 2);
1268        }
1269
1270        assert_eq!(batches[3].num_rows(), 1);
1271        assert_eq!(batches[3].num_columns(), 1);
1272        assert_eq!(batches[3].column(0).null_count(), 1);
1273    }
1274
1275    #[test]
1276    fn test_primitive_single_column_reader_test() {
1277        run_single_column_reader_tests::<BoolType, _, BoolType>(
1278            2,
1279            ConvertedType::NONE,
1280            None,
1281            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1282            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1283        );
1284        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1285            2,
1286            ConvertedType::NONE,
1287            None,
1288            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1289            &[
1290                Encoding::PLAIN,
1291                Encoding::RLE_DICTIONARY,
1292                Encoding::DELTA_BINARY_PACKED,
1293                Encoding::BYTE_STREAM_SPLIT,
1294            ],
1295        );
1296        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1297            2,
1298            ConvertedType::NONE,
1299            None,
1300            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1301            &[
1302                Encoding::PLAIN,
1303                Encoding::RLE_DICTIONARY,
1304                Encoding::DELTA_BINARY_PACKED,
1305                Encoding::BYTE_STREAM_SPLIT,
1306            ],
1307        );
1308        run_single_column_reader_tests::<FloatType, _, FloatType>(
1309            2,
1310            ConvertedType::NONE,
1311            None,
1312            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1313            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1314        );
1315    }
1316
1317    #[test]
1318    fn test_unsigned_primitive_single_column_reader_test() {
1319        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1320            2,
1321            ConvertedType::UINT_32,
1322            Some(ArrowDataType::UInt32),
1323            |vals| {
1324                Arc::new(UInt32Array::from_iter(
1325                    vals.iter().map(|x| x.map(|x| x as u32)),
1326                ))
1327            },
1328            &[
1329                Encoding::PLAIN,
1330                Encoding::RLE_DICTIONARY,
1331                Encoding::DELTA_BINARY_PACKED,
1332            ],
1333        );
1334        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1335            2,
1336            ConvertedType::UINT_64,
1337            Some(ArrowDataType::UInt64),
1338            |vals| {
1339                Arc::new(UInt64Array::from_iter(
1340                    vals.iter().map(|x| x.map(|x| x as u64)),
1341                ))
1342            },
1343            &[
1344                Encoding::PLAIN,
1345                Encoding::RLE_DICTIONARY,
1346                Encoding::DELTA_BINARY_PACKED,
1347            ],
1348        );
1349    }
1350
1351    #[test]
1352    fn test_unsigned_roundtrip() {
1353        let schema = Arc::new(Schema::new(vec![
1354            Field::new("uint32", ArrowDataType::UInt32, true),
1355            Field::new("uint64", ArrowDataType::UInt64, true),
1356        ]));
1357
1358        let mut buf = Vec::with_capacity(1024);
1359        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1360
1361        let original = RecordBatch::try_new(
1362            schema,
1363            vec![
1364                Arc::new(UInt32Array::from_iter_values([
1365                    0,
1366                    i32::MAX as u32,
1367                    u32::MAX,
1368                ])),
1369                Arc::new(UInt64Array::from_iter_values([
1370                    0,
1371                    i64::MAX as u64,
1372                    u64::MAX,
1373                ])),
1374            ],
1375        )
1376        .unwrap();
1377
1378        writer.write(&original).unwrap();
1379        writer.close().unwrap();
1380
1381        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1382        let ret = reader.next().unwrap().unwrap();
1383        assert_eq!(ret, original);
1384
1385        // Check they can be downcast to the correct type
1386        ret.column(0)
1387            .as_any()
1388            .downcast_ref::<UInt32Array>()
1389            .unwrap();
1390
1391        ret.column(1)
1392            .as_any()
1393            .downcast_ref::<UInt64Array>()
1394            .unwrap();
1395    }
1396
1397    #[test]
1398    fn test_float16_roundtrip() -> Result<()> {
1399        let schema = Arc::new(Schema::new(vec![
1400            Field::new("float16", ArrowDataType::Float16, false),
1401            Field::new("float16-nullable", ArrowDataType::Float16, true),
1402        ]));
1403
1404        let mut buf = Vec::with_capacity(1024);
1405        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1406
1407        let original = RecordBatch::try_new(
1408            schema,
1409            vec![
1410                Arc::new(Float16Array::from_iter_values([
1411                    f16::EPSILON,
1412                    f16::MIN,
1413                    f16::MAX,
1414                    f16::NAN,
1415                    f16::INFINITY,
1416                    f16::NEG_INFINITY,
1417                    f16::ONE,
1418                    f16::NEG_ONE,
1419                    f16::ZERO,
1420                    f16::NEG_ZERO,
1421                    f16::E,
1422                    f16::PI,
1423                    f16::FRAC_1_PI,
1424                ])),
1425                Arc::new(Float16Array::from(vec![
1426                    None,
1427                    None,
1428                    None,
1429                    Some(f16::NAN),
1430                    Some(f16::INFINITY),
1431                    Some(f16::NEG_INFINITY),
1432                    None,
1433                    None,
1434                    None,
1435                    None,
1436                    None,
1437                    None,
1438                    Some(f16::FRAC_1_PI),
1439                ])),
1440            ],
1441        )?;
1442
1443        writer.write(&original)?;
1444        writer.close()?;
1445
1446        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1447        let ret = reader.next().unwrap()?;
1448        assert_eq!(ret, original);
1449
1450        // Ensure can be downcast to the correct type
1451        ret.column(0).as_primitive::<Float16Type>();
1452        ret.column(1).as_primitive::<Float16Type>();
1453
1454        Ok(())
1455    }
1456
1457    #[test]
1458    fn test_time_utc_roundtrip() -> Result<()> {
1459        let schema = Arc::new(Schema::new(vec![
1460            Field::new(
1461                "time_millis",
1462                ArrowDataType::Time32(TimeUnit::Millisecond),
1463                true,
1464            )
1465            .with_metadata(HashMap::from_iter(vec![(
1466                "adjusted_to_utc".to_string(),
1467                "".to_string(),
1468            )])),
1469            Field::new(
1470                "time_micros",
1471                ArrowDataType::Time64(TimeUnit::Microsecond),
1472                true,
1473            )
1474            .with_metadata(HashMap::from_iter(vec![(
1475                "adjusted_to_utc".to_string(),
1476                "".to_string(),
1477            )])),
1478        ]));
1479
1480        let mut buf = Vec::with_capacity(1024);
1481        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1482
1483        let original = RecordBatch::try_new(
1484            schema,
1485            vec![
1486                Arc::new(Time32MillisecondArray::from(vec![
1487                    Some(-1),
1488                    Some(0),
1489                    Some(86_399_000),
1490                    Some(86_400_000),
1491                    Some(86_401_000),
1492                    None,
1493                ])),
1494                Arc::new(Time64MicrosecondArray::from(vec![
1495                    Some(-1),
1496                    Some(0),
1497                    Some(86_399 * 1_000_000),
1498                    Some(86_400 * 1_000_000),
1499                    Some(86_401 * 1_000_000),
1500                    None,
1501                ])),
1502            ],
1503        )?;
1504
1505        writer.write(&original)?;
1506        writer.close()?;
1507
1508        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1509        let ret = reader.next().unwrap()?;
1510        assert_eq!(ret, original);
1511
1512        // Ensure can be downcast to the correct type
1513        ret.column(0).as_primitive::<Time32MillisecondType>();
1514        ret.column(1).as_primitive::<Time64MicrosecondType>();
1515
1516        Ok(())
1517    }
1518
1519    #[test]
1520    fn test_date32_roundtrip() -> Result<()> {
1521        use arrow_array::Date32Array;
1522
1523        let schema = Arc::new(Schema::new(vec![Field::new(
1524            "date32",
1525            ArrowDataType::Date32,
1526            false,
1527        )]));
1528
1529        let mut buf = Vec::with_capacity(1024);
1530
1531        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1532
1533        let original = RecordBatch::try_new(
1534            schema,
1535            vec![Arc::new(Date32Array::from(vec![
1536                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1537            ]))],
1538        )?;
1539
1540        writer.write(&original)?;
1541        writer.close()?;
1542
1543        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1544        let ret = reader.next().unwrap()?;
1545        assert_eq!(ret, original);
1546
1547        // Ensure can be downcast to the correct type
1548        ret.column(0).as_primitive::<Date32Type>();
1549
1550        Ok(())
1551    }
1552
1553    #[test]
1554    fn test_date64_roundtrip() -> Result<()> {
1555        use arrow_array::Date64Array;
1556
1557        let schema = Arc::new(Schema::new(vec![
1558            Field::new("small-date64", ArrowDataType::Date64, false),
1559            Field::new("big-date64", ArrowDataType::Date64, false),
1560            Field::new("invalid-date64", ArrowDataType::Date64, false),
1561        ]));
1562
1563        let mut default_buf = Vec::with_capacity(1024);
1564        let mut coerce_buf = Vec::with_capacity(1024);
1565
1566        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1567
1568        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1569        let mut coerce_writer =
1570            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1571
1572        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1573
1574        let original = RecordBatch::try_new(
1575            schema,
1576            vec![
1577                // small-date64
1578                Arc::new(Date64Array::from(vec![
1579                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1580                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1581                    0,
1582                    1_000 * NUM_MILLISECONDS_IN_DAY,
1583                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1584                ])),
1585                // big-date64
1586                Arc::new(Date64Array::from(vec![
1587                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1588                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1589                    0,
1590                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1591                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1592                ])),
1593                // invalid-date64
1594                Arc::new(Date64Array::from(vec![
1595                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1596                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1597                    1,
1598                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1599                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1600                ])),
1601            ],
1602        )?;
1603
1604        default_writer.write(&original)?;
1605        coerce_writer.write(&original)?;
1606
1607        default_writer.close()?;
1608        coerce_writer.close()?;
1609
1610        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1611        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1612
1613        let default_ret = default_reader.next().unwrap()?;
1614        let coerce_ret = coerce_reader.next().unwrap()?;
1615
1616        // Roundtrip should be successful when default writer used
1617        assert_eq!(default_ret, original);
1618
1619        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1620        assert_eq!(coerce_ret.column(0), original.column(0));
1621        assert_ne!(coerce_ret.column(1), original.column(1));
1622        assert_ne!(coerce_ret.column(2), original.column(2));
1623
1624        // Ensure both can be downcast to the correct type
1625        default_ret.column(0).as_primitive::<Date64Type>();
1626        coerce_ret.column(0).as_primitive::<Date64Type>();
1627
1628        Ok(())
1629    }
1630    struct RandFixedLenGen {}
1631
1632    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1633        fn gen(len: i32) -> FixedLenByteArray {
1634            let mut v = vec![0u8; len as usize];
1635            rng().fill_bytes(&mut v);
1636            ByteArray::from(v).into()
1637        }
1638    }
1639
1640    #[test]
1641    fn test_fixed_length_binary_column_reader() {
1642        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1643            20,
1644            ConvertedType::NONE,
1645            None,
1646            |vals| {
1647                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1648                for val in vals {
1649                    match val {
1650                        Some(b) => builder.append_value(b).unwrap(),
1651                        None => builder.append_null(),
1652                    }
1653                }
1654                Arc::new(builder.finish())
1655            },
1656            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1657        );
1658    }
1659
1660    #[test]
1661    fn test_interval_day_time_column_reader() {
1662        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1663            12,
1664            ConvertedType::INTERVAL,
1665            None,
1666            |vals| {
1667                Arc::new(
1668                    vals.iter()
1669                        .map(|x| {
1670                            x.as_ref().map(|b| IntervalDayTime {
1671                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1672                                milliseconds: i32::from_le_bytes(
1673                                    b.as_ref()[8..12].try_into().unwrap(),
1674                                ),
1675                            })
1676                        })
1677                        .collect::<IntervalDayTimeArray>(),
1678                )
1679            },
1680            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1681        );
1682    }
1683
1684    #[test]
1685    fn test_int96_single_column_reader_test() {
1686        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1687
1688        type TypeHintAndConversionFunction =
1689            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1690
1691        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1692            // Test without a specified ArrowType hint.
1693            (None, |vals: &[Option<Int96>]| {
1694                Arc::new(TimestampNanosecondArray::from_iter(
1695                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1696                )) as ArrayRef
1697            }),
1698            // Test other TimeUnits as ArrowType hints.
1699            (
1700                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1701                |vals: &[Option<Int96>]| {
1702                    Arc::new(TimestampSecondArray::from_iter(
1703                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1704                    )) as ArrayRef
1705                },
1706            ),
1707            (
1708                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1709                |vals: &[Option<Int96>]| {
1710                    Arc::new(TimestampMillisecondArray::from_iter(
1711                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1712                    )) as ArrayRef
1713                },
1714            ),
1715            (
1716                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1717                |vals: &[Option<Int96>]| {
1718                    Arc::new(TimestampMicrosecondArray::from_iter(
1719                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1720                    )) as ArrayRef
1721                },
1722            ),
1723            (
1724                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1725                |vals: &[Option<Int96>]| {
1726                    Arc::new(TimestampNanosecondArray::from_iter(
1727                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1728                    )) as ArrayRef
1729                },
1730            ),
1731            // Test another timezone with TimeUnit as ArrowType hints.
1732            (
1733                Some(ArrowDataType::Timestamp(
1734                    TimeUnit::Second,
1735                    Some(Arc::from("-05:00")),
1736                )),
1737                |vals: &[Option<Int96>]| {
1738                    Arc::new(
1739                        TimestampSecondArray::from_iter(
1740                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1741                        )
1742                        .with_timezone("-05:00"),
1743                    ) as ArrayRef
1744                },
1745            ),
1746        ];
1747
1748        resolutions.iter().for_each(|(arrow_type, converter)| {
1749            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1750                2,
1751                ConvertedType::NONE,
1752                arrow_type.clone(),
1753                converter,
1754                encodings,
1755            );
1756        })
1757    }
1758
1759    #[test]
1760    fn test_int96_from_spark_file_with_provided_schema() {
1761        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1762        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1763        // microsecond resolution for the Parquet reader to interpret these values correctly.
1764        use arrow_schema::DataType::Timestamp;
1765        let test_data = arrow::util::test_util::parquet_test_data();
1766        let path = format!("{test_data}/int96_from_spark.parquet");
1767        let file = File::open(path).unwrap();
1768
1769        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1770            "a",
1771            Timestamp(TimeUnit::Microsecond, None),
1772            true,
1773        )]));
1774        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1775
1776        let mut record_reader =
1777            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1778                .unwrap()
1779                .build()
1780                .unwrap();
1781
1782        let batch = record_reader.next().unwrap().unwrap();
1783        assert_eq!(batch.num_columns(), 1);
1784        let column = batch.column(0);
1785        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1786
1787        let expected = Arc::new(Int64Array::from(vec![
1788            Some(1704141296123456),
1789            Some(1704070800000000),
1790            Some(253402225200000000),
1791            Some(1735599600000000),
1792            None,
1793            Some(9089380393200000000),
1794        ]));
1795
1796        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1797        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1798        // anyway, so this should be a zero-copy non-modifying cast.
1799
1800        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1801        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1802
1803        assert_eq!(casted_timestamps.len(), expected.len());
1804
1805        casted_timestamps
1806            .iter()
1807            .zip(expected.iter())
1808            .for_each(|(lhs, rhs)| {
1809                assert_eq!(lhs, rhs);
1810            });
1811    }
1812
1813    #[test]
1814    fn test_int96_from_spark_file_without_provided_schema() {
1815        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1816        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1817        // values when read as nanosecond resolution overflow and result in garbage values.
1818        use arrow_schema::DataType::Timestamp;
1819        let test_data = arrow::util::test_util::parquet_test_data();
1820        let path = format!("{test_data}/int96_from_spark.parquet");
1821        let file = File::open(path).unwrap();
1822
1823        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1824            .unwrap()
1825            .build()
1826            .unwrap();
1827
1828        let batch = record_reader.next().unwrap().unwrap();
1829        assert_eq!(batch.num_columns(), 1);
1830        let column = batch.column(0);
1831        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1832
1833        let expected = Arc::new(Int64Array::from(vec![
1834            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1835            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1836            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1837            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1838            None,
1839            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1840        ]));
1841
1842        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1843        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1844        // anyway, so this should be a zero-copy non-modifying cast.
1845
1846        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1847        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1848
1849        assert_eq!(casted_timestamps.len(), expected.len());
1850
1851        casted_timestamps
1852            .iter()
1853            .zip(expected.iter())
1854            .for_each(|(lhs, rhs)| {
1855                assert_eq!(lhs, rhs);
1856            });
1857    }
1858
1859    struct RandUtf8Gen {}
1860
1861    impl RandGen<ByteArrayType> for RandUtf8Gen {
1862        fn gen(len: i32) -> ByteArray {
1863            Int32Type::gen(len).to_string().as_str().into()
1864        }
1865    }
1866
1867    #[test]
1868    fn test_utf8_single_column_reader_test() {
1869        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1870            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1871                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1872            })))
1873        }
1874
1875        let encodings = &[
1876            Encoding::PLAIN,
1877            Encoding::RLE_DICTIONARY,
1878            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1879            Encoding::DELTA_BYTE_ARRAY,
1880        ];
1881
1882        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1883            2,
1884            ConvertedType::NONE,
1885            None,
1886            |vals| {
1887                Arc::new(BinaryArray::from_iter(
1888                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1889                ))
1890            },
1891            encodings,
1892        );
1893
1894        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1895            2,
1896            ConvertedType::UTF8,
1897            None,
1898            string_converter::<i32>,
1899            encodings,
1900        );
1901
1902        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1903            2,
1904            ConvertedType::UTF8,
1905            Some(ArrowDataType::Utf8),
1906            string_converter::<i32>,
1907            encodings,
1908        );
1909
1910        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1911            2,
1912            ConvertedType::UTF8,
1913            Some(ArrowDataType::LargeUtf8),
1914            string_converter::<i64>,
1915            encodings,
1916        );
1917
1918        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1919        for key in &small_key_types {
1920            for encoding in encodings {
1921                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1922                opts.encoding = *encoding;
1923
1924                let data_type =
1925                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1926
1927                // Cannot run full test suite as keys overflow, run small test instead
1928                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1929                    opts,
1930                    2,
1931                    ConvertedType::UTF8,
1932                    Some(data_type.clone()),
1933                    move |vals| {
1934                        let vals = string_converter::<i32>(vals);
1935                        arrow::compute::cast(&vals, &data_type).unwrap()
1936                    },
1937                );
1938            }
1939        }
1940
1941        let key_types = [
1942            ArrowDataType::Int16,
1943            ArrowDataType::UInt16,
1944            ArrowDataType::Int32,
1945            ArrowDataType::UInt32,
1946            ArrowDataType::Int64,
1947            ArrowDataType::UInt64,
1948        ];
1949
1950        for key in &key_types {
1951            let data_type =
1952                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1953
1954            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1955                2,
1956                ConvertedType::UTF8,
1957                Some(data_type.clone()),
1958                move |vals| {
1959                    let vals = string_converter::<i32>(vals);
1960                    arrow::compute::cast(&vals, &data_type).unwrap()
1961                },
1962                encodings,
1963            );
1964
1965            // https://github.com/apache/arrow-rs/issues/1179
1966            // let data_type = ArrowDataType::Dictionary(
1967            //     Box::new(key.clone()),
1968            //     Box::new(ArrowDataType::LargeUtf8),
1969            // );
1970            //
1971            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1972            //     2,
1973            //     ConvertedType::UTF8,
1974            //     Some(data_type.clone()),
1975            //     move |vals| {
1976            //         let vals = string_converter::<i64>(vals);
1977            //         arrow::compute::cast(&vals, &data_type).unwrap()
1978            //     },
1979            //     encodings,
1980            // );
1981        }
1982    }
1983
1984    #[test]
1985    fn test_decimal_nullable_struct() {
1986        let decimals = Decimal256Array::from_iter_values(
1987            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1988        );
1989
1990        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1991            "decimals",
1992            decimals.data_type().clone(),
1993            false,
1994        )])))
1995        .len(8)
1996        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1997        .child_data(vec![decimals.into_data()])
1998        .build()
1999        .unwrap();
2000
2001        let written =
2002            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2003                .unwrap();
2004
2005        let mut buffer = Vec::with_capacity(1024);
2006        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2007        writer.write(&written).unwrap();
2008        writer.close().unwrap();
2009
2010        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2011            .unwrap()
2012            .collect::<Result<Vec<_>, _>>()
2013            .unwrap();
2014
2015        assert_eq!(&written.slice(0, 3), &read[0]);
2016        assert_eq!(&written.slice(3, 3), &read[1]);
2017        assert_eq!(&written.slice(6, 2), &read[2]);
2018    }
2019
2020    #[test]
2021    fn test_int32_nullable_struct() {
2022        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2023        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2024            "int32",
2025            int32.data_type().clone(),
2026            false,
2027        )])))
2028        .len(8)
2029        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2030        .child_data(vec![int32.into_data()])
2031        .build()
2032        .unwrap();
2033
2034        let written =
2035            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2036                .unwrap();
2037
2038        let mut buffer = Vec::with_capacity(1024);
2039        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2040        writer.write(&written).unwrap();
2041        writer.close().unwrap();
2042
2043        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2044            .unwrap()
2045            .collect::<Result<Vec<_>, _>>()
2046            .unwrap();
2047
2048        assert_eq!(&written.slice(0, 3), &read[0]);
2049        assert_eq!(&written.slice(3, 3), &read[1]);
2050        assert_eq!(&written.slice(6, 2), &read[2]);
2051    }
2052
2053    #[test]
2054    fn test_decimal_list() {
2055        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2056
2057        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2058        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2059            decimals.data_type().clone(),
2060            false,
2061        ))))
2062        .len(7)
2063        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2064        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2065        .child_data(vec![decimals.into_data()])
2066        .build()
2067        .unwrap();
2068
2069        let written =
2070            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2071                .unwrap();
2072
2073        let mut buffer = Vec::with_capacity(1024);
2074        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2075        writer.write(&written).unwrap();
2076        writer.close().unwrap();
2077
2078        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2079            .unwrap()
2080            .collect::<Result<Vec<_>, _>>()
2081            .unwrap();
2082
2083        assert_eq!(&written.slice(0, 3), &read[0]);
2084        assert_eq!(&written.slice(3, 3), &read[1]);
2085        assert_eq!(&written.slice(6, 1), &read[2]);
2086    }
2087
2088    #[test]
2089    fn test_read_decimal_file() {
2090        use arrow_array::Decimal128Array;
2091        let testdata = arrow::util::test_util::parquet_test_data();
2092        let file_variants = vec![
2093            ("byte_array", 4),
2094            ("fixed_length", 25),
2095            ("int32", 4),
2096            ("int64", 10),
2097        ];
2098        for (prefix, target_precision) in file_variants {
2099            let path = format!("{testdata}/{prefix}_decimal.parquet");
2100            let file = File::open(path).unwrap();
2101            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2102
2103            let batch = record_reader.next().unwrap().unwrap();
2104            assert_eq!(batch.num_rows(), 24);
2105            let col = batch
2106                .column(0)
2107                .as_any()
2108                .downcast_ref::<Decimal128Array>()
2109                .unwrap();
2110
2111            let expected = 1..25;
2112
2113            assert_eq!(col.precision(), target_precision);
2114            assert_eq!(col.scale(), 2);
2115
2116            for (i, v) in expected.enumerate() {
2117                assert_eq!(col.value(i), v * 100_i128);
2118            }
2119        }
2120    }
2121
2122    #[test]
2123    fn test_read_float16_nonzeros_file() {
2124        use arrow_array::Float16Array;
2125        let testdata = arrow::util::test_util::parquet_test_data();
2126        // see https://github.com/apache/parquet-testing/pull/40
2127        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2128        let file = File::open(path).unwrap();
2129        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2130
2131        let batch = record_reader.next().unwrap().unwrap();
2132        assert_eq!(batch.num_rows(), 8);
2133        let col = batch
2134            .column(0)
2135            .as_any()
2136            .downcast_ref::<Float16Array>()
2137            .unwrap();
2138
2139        let f16_two = f16::ONE + f16::ONE;
2140
2141        assert_eq!(col.null_count(), 1);
2142        assert!(col.is_null(0));
2143        assert_eq!(col.value(1), f16::ONE);
2144        assert_eq!(col.value(2), -f16_two);
2145        assert!(col.value(3).is_nan());
2146        assert_eq!(col.value(4), f16::ZERO);
2147        assert!(col.value(4).is_sign_positive());
2148        assert_eq!(col.value(5), f16::NEG_ONE);
2149        assert_eq!(col.value(6), f16::NEG_ZERO);
2150        assert!(col.value(6).is_sign_negative());
2151        assert_eq!(col.value(7), f16_two);
2152    }
2153
2154    #[test]
2155    fn test_read_float16_zeros_file() {
2156        use arrow_array::Float16Array;
2157        let testdata = arrow::util::test_util::parquet_test_data();
2158        // see https://github.com/apache/parquet-testing/pull/40
2159        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2160        let file = File::open(path).unwrap();
2161        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2162
2163        let batch = record_reader.next().unwrap().unwrap();
2164        assert_eq!(batch.num_rows(), 3);
2165        let col = batch
2166            .column(0)
2167            .as_any()
2168            .downcast_ref::<Float16Array>()
2169            .unwrap();
2170
2171        assert_eq!(col.null_count(), 1);
2172        assert!(col.is_null(0));
2173        assert_eq!(col.value(1), f16::ZERO);
2174        assert!(col.value(1).is_sign_positive());
2175        assert!(col.value(2).is_nan());
2176    }
2177
2178    #[test]
2179    fn test_read_float32_float64_byte_stream_split() {
2180        let path = format!(
2181            "{}/byte_stream_split.zstd.parquet",
2182            arrow::util::test_util::parquet_test_data(),
2183        );
2184        let file = File::open(path).unwrap();
2185        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2186
2187        let mut row_count = 0;
2188        for batch in record_reader {
2189            let batch = batch.unwrap();
2190            row_count += batch.num_rows();
2191            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2192            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2193
2194            // This file contains floats from a standard normal distribution
2195            for &x in f32_col.values() {
2196                assert!(x > -10.0);
2197                assert!(x < 10.0);
2198            }
2199            for &x in f64_col.values() {
2200                assert!(x > -10.0);
2201                assert!(x < 10.0);
2202            }
2203        }
2204        assert_eq!(row_count, 300);
2205    }
2206
2207    #[test]
2208    fn test_read_extended_byte_stream_split() {
2209        let path = format!(
2210            "{}/byte_stream_split_extended.gzip.parquet",
2211            arrow::util::test_util::parquet_test_data(),
2212        );
2213        let file = File::open(path).unwrap();
2214        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2215
2216        let mut row_count = 0;
2217        for batch in record_reader {
2218            let batch = batch.unwrap();
2219            row_count += batch.num_rows();
2220
2221            // 0,1 are f16
2222            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2223            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2224            assert_eq!(f16_col.len(), f16_bss.len());
2225            f16_col
2226                .iter()
2227                .zip(f16_bss.iter())
2228                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2229
2230            // 2,3 are f32
2231            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2232            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2233            assert_eq!(f32_col.len(), f32_bss.len());
2234            f32_col
2235                .iter()
2236                .zip(f32_bss.iter())
2237                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2238
2239            // 4,5 are f64
2240            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2241            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2242            assert_eq!(f64_col.len(), f64_bss.len());
2243            f64_col
2244                .iter()
2245                .zip(f64_bss.iter())
2246                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2247
2248            // 6,7 are i32
2249            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2250            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2251            assert_eq!(i32_col.len(), i32_bss.len());
2252            i32_col
2253                .iter()
2254                .zip(i32_bss.iter())
2255                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2256
2257            // 8,9 are i64
2258            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2259            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2260            assert_eq!(i64_col.len(), i64_bss.len());
2261            i64_col
2262                .iter()
2263                .zip(i64_bss.iter())
2264                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2265
2266            // 10,11 are FLBA(5)
2267            let flba_col = batch.column(10).as_fixed_size_binary();
2268            let flba_bss = batch.column(11).as_fixed_size_binary();
2269            assert_eq!(flba_col.len(), flba_bss.len());
2270            flba_col
2271                .iter()
2272                .zip(flba_bss.iter())
2273                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2274
2275            // 12,13 are FLBA(4) (decimal(7,3))
2276            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2277            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2278            assert_eq!(dec_col.len(), dec_bss.len());
2279            dec_col
2280                .iter()
2281                .zip(dec_bss.iter())
2282                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2283        }
2284        assert_eq!(row_count, 200);
2285    }
2286
2287    #[test]
2288    fn test_read_incorrect_map_schema_file() {
2289        let testdata = arrow::util::test_util::parquet_test_data();
2290        // see https://github.com/apache/parquet-testing/pull/47
2291        let path = format!("{testdata}/incorrect_map_schema.parquet");
2292        let file = File::open(path).unwrap();
2293        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2294
2295        let batch = record_reader.next().unwrap().unwrap();
2296        assert_eq!(batch.num_rows(), 1);
2297
2298        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2299            "my_map",
2300            ArrowDataType::Map(
2301                Arc::new(Field::new(
2302                    "key_value",
2303                    ArrowDataType::Struct(Fields::from(vec![
2304                        Field::new("key", ArrowDataType::Utf8, false),
2305                        Field::new("value", ArrowDataType::Utf8, true),
2306                    ])),
2307                    false,
2308                )),
2309                false,
2310            ),
2311            true,
2312        )]));
2313        assert_eq!(batch.schema().as_ref(), &expected_schema);
2314
2315        assert_eq!(batch.num_rows(), 1);
2316        assert_eq!(batch.column(0).null_count(), 0);
2317        assert_eq!(
2318            batch.column(0).as_map().keys().as_ref(),
2319            &StringArray::from(vec!["parent", "name"])
2320        );
2321        assert_eq!(
2322            batch.column(0).as_map().values().as_ref(),
2323            &StringArray::from(vec!["another", "report"])
2324        );
2325    }
2326
2327    #[test]
2328    fn test_read_dict_fixed_size_binary() {
2329        let schema = Arc::new(Schema::new(vec![Field::new(
2330            "a",
2331            ArrowDataType::Dictionary(
2332                Box::new(ArrowDataType::UInt8),
2333                Box::new(ArrowDataType::FixedSizeBinary(8)),
2334            ),
2335            true,
2336        )]));
2337        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2338        let values = FixedSizeBinaryArray::try_from_iter(
2339            vec![
2340                (0u8..8u8).collect::<Vec<u8>>(),
2341                (24u8..32u8).collect::<Vec<u8>>(),
2342            ]
2343            .into_iter(),
2344        )
2345        .unwrap();
2346        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2347        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2348
2349        let mut buffer = Vec::with_capacity(1024);
2350        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2351        writer.write(&batch).unwrap();
2352        writer.close().unwrap();
2353        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2354            .unwrap()
2355            .collect::<Result<Vec<_>, _>>()
2356            .unwrap();
2357
2358        assert_eq!(read.len(), 1);
2359        assert_eq!(&batch, &read[0])
2360    }
2361
2362    /// Parameters for single_column_reader_test
2363    #[derive(Clone)]
2364    struct TestOptions {
2365        /// Number of row group to write to parquet (row group size =
2366        /// num_row_groups / num_rows)
2367        num_row_groups: usize,
2368        /// Total number of rows per row group
2369        num_rows: usize,
2370        /// Size of batches to read back
2371        record_batch_size: usize,
2372        /// Percentage of nulls in column or None if required
2373        null_percent: Option<usize>,
2374        /// Set write batch size
2375        ///
2376        /// This is the number of rows that are written at once to a page and
2377        /// therefore acts as a bound on the page granularity of a row group
2378        write_batch_size: usize,
2379        /// Maximum size of page in bytes
2380        max_data_page_size: usize,
2381        /// Maximum size of dictionary page in bytes
2382        max_dict_page_size: usize,
2383        /// Writer version
2384        writer_version: WriterVersion,
2385        /// Enabled statistics
2386        enabled_statistics: EnabledStatistics,
2387        /// Encoding
2388        encoding: Encoding,
2389        /// row selections and total selected row count
2390        row_selections: Option<(RowSelection, usize)>,
2391        /// row filter
2392        row_filter: Option<Vec<bool>>,
2393        /// limit
2394        limit: Option<usize>,
2395        /// offset
2396        offset: Option<usize>,
2397    }
2398
2399    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2400    impl std::fmt::Debug for TestOptions {
2401        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2402            f.debug_struct("TestOptions")
2403                .field("num_row_groups", &self.num_row_groups)
2404                .field("num_rows", &self.num_rows)
2405                .field("record_batch_size", &self.record_batch_size)
2406                .field("null_percent", &self.null_percent)
2407                .field("write_batch_size", &self.write_batch_size)
2408                .field("max_data_page_size", &self.max_data_page_size)
2409                .field("max_dict_page_size", &self.max_dict_page_size)
2410                .field("writer_version", &self.writer_version)
2411                .field("enabled_statistics", &self.enabled_statistics)
2412                .field("encoding", &self.encoding)
2413                .field("row_selections", &self.row_selections.is_some())
2414                .field("row_filter", &self.row_filter.is_some())
2415                .field("limit", &self.limit)
2416                .field("offset", &self.offset)
2417                .finish()
2418        }
2419    }
2420
2421    impl Default for TestOptions {
2422        fn default() -> Self {
2423            Self {
2424                num_row_groups: 2,
2425                num_rows: 100,
2426                record_batch_size: 15,
2427                null_percent: None,
2428                write_batch_size: 64,
2429                max_data_page_size: 1024 * 1024,
2430                max_dict_page_size: 1024 * 1024,
2431                writer_version: WriterVersion::PARQUET_1_0,
2432                enabled_statistics: EnabledStatistics::Page,
2433                encoding: Encoding::PLAIN,
2434                row_selections: None,
2435                row_filter: None,
2436                limit: None,
2437                offset: None,
2438            }
2439        }
2440    }
2441
2442    impl TestOptions {
2443        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2444            Self {
2445                num_row_groups,
2446                num_rows,
2447                record_batch_size,
2448                ..Default::default()
2449            }
2450        }
2451
2452        fn with_null_percent(self, null_percent: usize) -> Self {
2453            Self {
2454                null_percent: Some(null_percent),
2455                ..self
2456            }
2457        }
2458
2459        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2460            Self {
2461                max_data_page_size,
2462                ..self
2463            }
2464        }
2465
2466        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2467            Self {
2468                max_dict_page_size,
2469                ..self
2470            }
2471        }
2472
2473        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2474            Self {
2475                enabled_statistics,
2476                ..self
2477            }
2478        }
2479
2480        fn with_row_selections(self) -> Self {
2481            assert!(self.row_filter.is_none(), "Must set row selection first");
2482
2483            let mut rng = rng();
2484            let step = rng.random_range(self.record_batch_size..self.num_rows);
2485            let row_selections = create_test_selection(
2486                step,
2487                self.num_row_groups * self.num_rows,
2488                rng.random::<bool>(),
2489            );
2490            Self {
2491                row_selections: Some(row_selections),
2492                ..self
2493            }
2494        }
2495
2496        fn with_row_filter(self) -> Self {
2497            let row_count = match &self.row_selections {
2498                Some((_, count)) => *count,
2499                None => self.num_row_groups * self.num_rows,
2500            };
2501
2502            let mut rng = rng();
2503            Self {
2504                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2505                ..self
2506            }
2507        }
2508
2509        fn with_limit(self, limit: usize) -> Self {
2510            Self {
2511                limit: Some(limit),
2512                ..self
2513            }
2514        }
2515
2516        fn with_offset(self, offset: usize) -> Self {
2517            Self {
2518                offset: Some(offset),
2519                ..self
2520            }
2521        }
2522
2523        fn writer_props(&self) -> WriterProperties {
2524            let builder = WriterProperties::builder()
2525                .set_data_page_size_limit(self.max_data_page_size)
2526                .set_write_batch_size(self.write_batch_size)
2527                .set_writer_version(self.writer_version)
2528                .set_statistics_enabled(self.enabled_statistics);
2529
2530            let builder = match self.encoding {
2531                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2532                    .set_dictionary_enabled(true)
2533                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2534                _ => builder
2535                    .set_dictionary_enabled(false)
2536                    .set_encoding(self.encoding),
2537            };
2538
2539            builder.build()
2540        }
2541    }
2542
2543    /// Create a parquet file and then read it using
2544    /// `ParquetFileArrowReader` using a standard set of parameters
2545    /// `opts`.
2546    ///
2547    /// `rand_max` represents the maximum size of value to pass to to
2548    /// value generator
2549    fn run_single_column_reader_tests<T, F, G>(
2550        rand_max: i32,
2551        converted_type: ConvertedType,
2552        arrow_type: Option<ArrowDataType>,
2553        converter: F,
2554        encodings: &[Encoding],
2555    ) where
2556        T: DataType,
2557        G: RandGen<T>,
2558        F: Fn(&[Option<T::T>]) -> ArrayRef,
2559    {
2560        let all_options = vec![
2561            // choose record_batch_batch (15) so batches cross row
2562            // group boundaries (50 rows in 2 row groups) cases.
2563            TestOptions::new(2, 100, 15),
2564            // choose record_batch_batch (5) so batches sometime fall
2565            // on row group boundaries and (25 rows in 3 row groups
2566            // --> row groups of 10, 10, and 5). Tests buffer
2567            // refilling edge cases.
2568            TestOptions::new(3, 25, 5),
2569            // Choose record_batch_size (25) so all batches fall
2570            // exactly on row group boundary (25). Tests buffer
2571            // refilling edge cases.
2572            TestOptions::new(4, 100, 25),
2573            // Set maximum page size so row groups have multiple pages
2574            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2575            // Set small dictionary page size to test dictionary fallback
2576            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2577            // Test optional but with no nulls
2578            TestOptions::new(2, 256, 127).with_null_percent(0),
2579            // Test optional with nulls
2580            TestOptions::new(2, 256, 93).with_null_percent(25),
2581            // Test with limit of 0
2582            TestOptions::new(4, 100, 25).with_limit(0),
2583            // Test with limit of 50
2584            TestOptions::new(4, 100, 25).with_limit(50),
2585            // Test with limit equal to number of rows
2586            TestOptions::new(4, 100, 25).with_limit(10),
2587            // Test with limit larger than number of rows
2588            TestOptions::new(4, 100, 25).with_limit(101),
2589            // Test with limit + offset equal to number of rows
2590            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2591            // Test with limit + offset equal to number of rows
2592            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2593            // Test with limit + offset larger than number of rows
2594            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2595            // Test with no page-level statistics
2596            TestOptions::new(2, 256, 91)
2597                .with_null_percent(25)
2598                .with_enabled_statistics(EnabledStatistics::Chunk),
2599            // Test with no statistics
2600            TestOptions::new(2, 256, 91)
2601                .with_null_percent(25)
2602                .with_enabled_statistics(EnabledStatistics::None),
2603            // Test with all null
2604            TestOptions::new(2, 128, 91)
2605                .with_null_percent(100)
2606                .with_enabled_statistics(EnabledStatistics::None),
2607            // Test skip
2608
2609            // choose record_batch_batch (15) so batches cross row
2610            // group boundaries (50 rows in 2 row groups) cases.
2611            TestOptions::new(2, 100, 15).with_row_selections(),
2612            // choose record_batch_batch (5) so batches sometime fall
2613            // on row group boundaries and (25 rows in 3 row groups
2614            // --> row groups of 10, 10, and 5). Tests buffer
2615            // refilling edge cases.
2616            TestOptions::new(3, 25, 5).with_row_selections(),
2617            // Choose record_batch_size (25) so all batches fall
2618            // exactly on row group boundary (25). Tests buffer
2619            // refilling edge cases.
2620            TestOptions::new(4, 100, 25).with_row_selections(),
2621            // Set maximum page size so row groups have multiple pages
2622            TestOptions::new(3, 256, 73)
2623                .with_max_data_page_size(128)
2624                .with_row_selections(),
2625            // Set small dictionary page size to test dictionary fallback
2626            TestOptions::new(3, 256, 57)
2627                .with_max_dict_page_size(128)
2628                .with_row_selections(),
2629            // Test optional but with no nulls
2630            TestOptions::new(2, 256, 127)
2631                .with_null_percent(0)
2632                .with_row_selections(),
2633            // Test optional with nulls
2634            TestOptions::new(2, 256, 93)
2635                .with_null_percent(25)
2636                .with_row_selections(),
2637            // Test optional with nulls
2638            TestOptions::new(2, 256, 93)
2639                .with_null_percent(25)
2640                .with_row_selections()
2641                .with_limit(10),
2642            // Test optional with nulls
2643            TestOptions::new(2, 256, 93)
2644                .with_null_percent(25)
2645                .with_row_selections()
2646                .with_offset(20)
2647                .with_limit(10),
2648            // Test filter
2649
2650            // Test with row filter
2651            TestOptions::new(4, 100, 25).with_row_filter(),
2652            // Test with row selection and row filter
2653            TestOptions::new(4, 100, 25)
2654                .with_row_selections()
2655                .with_row_filter(),
2656            // Test with nulls and row filter
2657            TestOptions::new(2, 256, 93)
2658                .with_null_percent(25)
2659                .with_max_data_page_size(10)
2660                .with_row_filter(),
2661            // Test with nulls and row filter and small pages
2662            TestOptions::new(2, 256, 93)
2663                .with_null_percent(25)
2664                .with_max_data_page_size(10)
2665                .with_row_selections()
2666                .with_row_filter(),
2667            // Test with row selection and no offset index and small pages
2668            TestOptions::new(2, 256, 93)
2669                .with_enabled_statistics(EnabledStatistics::None)
2670                .with_max_data_page_size(10)
2671                .with_row_selections(),
2672        ];
2673
2674        all_options.into_iter().for_each(|opts| {
2675            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2676                for encoding in encodings {
2677                    let opts = TestOptions {
2678                        writer_version,
2679                        encoding: *encoding,
2680                        ..opts.clone()
2681                    };
2682
2683                    single_column_reader_test::<T, _, G>(
2684                        opts,
2685                        rand_max,
2686                        converted_type,
2687                        arrow_type.clone(),
2688                        &converter,
2689                    )
2690                }
2691            }
2692        });
2693    }
2694
2695    /// Create a parquet file and then read it using
2696    /// `ParquetFileArrowReader` using the parameters described in
2697    /// `opts`.
2698    fn single_column_reader_test<T, F, G>(
2699        opts: TestOptions,
2700        rand_max: i32,
2701        converted_type: ConvertedType,
2702        arrow_type: Option<ArrowDataType>,
2703        converter: F,
2704    ) where
2705        T: DataType,
2706        G: RandGen<T>,
2707        F: Fn(&[Option<T::T>]) -> ArrayRef,
2708    {
2709        // Print out options to facilitate debugging failures on CI
2710        println!(
2711            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2712            T::get_physical_type(), converted_type, arrow_type, opts
2713        );
2714
2715        //according to null_percent generate def_levels
2716        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2717            Some(null_percent) => {
2718                let mut rng = rng();
2719
2720                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2721                    .map(|_| {
2722                        std::iter::from_fn(|| {
2723                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2724                        })
2725                        .take(opts.num_rows)
2726                        .collect()
2727                    })
2728                    .collect();
2729                (Repetition::OPTIONAL, Some(def_levels))
2730            }
2731            None => (Repetition::REQUIRED, None),
2732        };
2733
2734        //generate random table data
2735        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2736            .map(|idx| {
2737                let null_count = match def_levels.as_ref() {
2738                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2739                    None => 0,
2740                };
2741                G::gen_vec(rand_max, opts.num_rows - null_count)
2742            })
2743            .collect();
2744
2745        let len = match T::get_physical_type() {
2746            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2747            crate::basic::Type::INT96 => 12,
2748            _ => -1,
2749        };
2750
2751        let fields = vec![Arc::new(
2752            Type::primitive_type_builder("leaf", T::get_physical_type())
2753                .with_repetition(repetition)
2754                .with_converted_type(converted_type)
2755                .with_length(len)
2756                .build()
2757                .unwrap(),
2758        )];
2759
2760        let schema = Arc::new(
2761            Type::group_type_builder("test_schema")
2762                .with_fields(fields)
2763                .build()
2764                .unwrap(),
2765        );
2766
2767        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2768
2769        let mut file = tempfile::tempfile().unwrap();
2770
2771        generate_single_column_file_with_data::<T>(
2772            &values,
2773            def_levels.as_ref(),
2774            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2775            schema,
2776            arrow_field,
2777            &opts,
2778        )
2779        .unwrap();
2780
2781        file.rewind().unwrap();
2782
2783        let options = ArrowReaderOptions::new()
2784            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2785
2786        let mut builder =
2787            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2788
2789        let expected_data = match opts.row_selections {
2790            Some((selections, row_count)) => {
2791                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2792
2793                let mut skip_data: Vec<Option<T::T>> = vec![];
2794                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2795                for select in dequeue {
2796                    if select.skip {
2797                        without_skip_data.drain(0..select.row_count);
2798                    } else {
2799                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2800                    }
2801                }
2802                builder = builder.with_row_selection(selections);
2803
2804                assert_eq!(skip_data.len(), row_count);
2805                skip_data
2806            }
2807            None => {
2808                //get flatten table data
2809                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2810                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2811                expected_data
2812            }
2813        };
2814
2815        let mut expected_data = match opts.row_filter {
2816            Some(filter) => {
2817                let expected_data = expected_data
2818                    .into_iter()
2819                    .zip(filter.iter())
2820                    .filter_map(|(d, f)| f.then(|| d))
2821                    .collect();
2822
2823                let mut filter_offset = 0;
2824                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2825                    ProjectionMask::all(),
2826                    move |b| {
2827                        let array = BooleanArray::from_iter(
2828                            filter
2829                                .iter()
2830                                .skip(filter_offset)
2831                                .take(b.num_rows())
2832                                .map(|x| Some(*x)),
2833                        );
2834                        filter_offset += b.num_rows();
2835                        Ok(array)
2836                    },
2837                ))]);
2838
2839                builder = builder.with_row_filter(filter);
2840                expected_data
2841            }
2842            None => expected_data,
2843        };
2844
2845        if let Some(offset) = opts.offset {
2846            builder = builder.with_offset(offset);
2847            expected_data = expected_data.into_iter().skip(offset).collect();
2848        }
2849
2850        if let Some(limit) = opts.limit {
2851            builder = builder.with_limit(limit);
2852            expected_data = expected_data.into_iter().take(limit).collect();
2853        }
2854
2855        let mut record_reader = builder
2856            .with_batch_size(opts.record_batch_size)
2857            .build()
2858            .unwrap();
2859
2860        let mut total_read = 0;
2861        loop {
2862            let maybe_batch = record_reader.next();
2863            if total_read < expected_data.len() {
2864                let end = min(total_read + opts.record_batch_size, expected_data.len());
2865                let batch = maybe_batch.unwrap().unwrap();
2866                assert_eq!(end - total_read, batch.num_rows());
2867
2868                let a = converter(&expected_data[total_read..end]);
2869                let b = Arc::clone(batch.column(0));
2870
2871                assert_eq!(a.data_type(), b.data_type());
2872                assert_eq!(a.to_data(), b.to_data());
2873                assert_eq!(
2874                    a.as_any().type_id(),
2875                    b.as_any().type_id(),
2876                    "incorrect type ids"
2877                );
2878
2879                total_read = end;
2880            } else {
2881                assert!(maybe_batch.is_none());
2882                break;
2883            }
2884        }
2885    }
2886
2887    fn gen_expected_data<T: DataType>(
2888        def_levels: Option<&Vec<Vec<i16>>>,
2889        values: &[Vec<T::T>],
2890    ) -> Vec<Option<T::T>> {
2891        let data: Vec<Option<T::T>> = match def_levels {
2892            Some(levels) => {
2893                let mut values_iter = values.iter().flatten();
2894                levels
2895                    .iter()
2896                    .flatten()
2897                    .map(|d| match d {
2898                        1 => Some(values_iter.next().cloned().unwrap()),
2899                        0 => None,
2900                        _ => unreachable!(),
2901                    })
2902                    .collect()
2903            }
2904            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2905        };
2906        data
2907    }
2908
2909    fn generate_single_column_file_with_data<T: DataType>(
2910        values: &[Vec<T::T>],
2911        def_levels: Option<&Vec<Vec<i16>>>,
2912        file: File,
2913        schema: TypePtr,
2914        field: Option<Field>,
2915        opts: &TestOptions,
2916    ) -> Result<crate::format::FileMetaData> {
2917        let mut writer_props = opts.writer_props();
2918        if let Some(field) = field {
2919            let arrow_schema = Schema::new(vec![field]);
2920            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2921        }
2922
2923        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2924
2925        for (idx, v) in values.iter().enumerate() {
2926            let def_levels = def_levels.map(|d| d[idx].as_slice());
2927            let mut row_group_writer = writer.next_row_group()?;
2928            {
2929                let mut column_writer = row_group_writer
2930                    .next_column()?
2931                    .expect("Column writer is none!");
2932
2933                column_writer
2934                    .typed::<T>()
2935                    .write_batch(v, def_levels, None)?;
2936
2937                column_writer.close()?;
2938            }
2939            row_group_writer.close()?;
2940        }
2941
2942        writer.close()
2943    }
2944
2945    fn get_test_file(file_name: &str) -> File {
2946        let mut path = PathBuf::new();
2947        path.push(arrow::util::test_util::arrow_test_data());
2948        path.push(file_name);
2949
2950        File::open(path.as_path()).expect("File not found!")
2951    }
2952
2953    #[test]
2954    fn test_read_structs() {
2955        // This particular test file has columns of struct types where there is
2956        // a column that has the same name as one of the struct fields
2957        // (see: ARROW-11452)
2958        let testdata = arrow::util::test_util::parquet_test_data();
2959        let path = format!("{testdata}/nested_structs.rust.parquet");
2960        let file = File::open(&path).unwrap();
2961        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2962
2963        for batch in record_batch_reader {
2964            batch.unwrap();
2965        }
2966
2967        let file = File::open(&path).unwrap();
2968        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2969
2970        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2971        let projected_reader = builder
2972            .with_projection(mask)
2973            .with_batch_size(60)
2974            .build()
2975            .unwrap();
2976
2977        let expected_schema = Schema::new(vec![
2978            Field::new(
2979                "roll_num",
2980                ArrowDataType::Struct(Fields::from(vec![Field::new(
2981                    "count",
2982                    ArrowDataType::UInt64,
2983                    false,
2984                )])),
2985                false,
2986            ),
2987            Field::new(
2988                "PC_CUR",
2989                ArrowDataType::Struct(Fields::from(vec![
2990                    Field::new("mean", ArrowDataType::Int64, false),
2991                    Field::new("sum", ArrowDataType::Int64, false),
2992                ])),
2993                false,
2994            ),
2995        ]);
2996
2997        // Tests for #1652 and #1654
2998        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2999
3000        for batch in projected_reader {
3001            let batch = batch.unwrap();
3002            assert_eq!(batch.schema().as_ref(), &expected_schema);
3003        }
3004    }
3005
3006    #[test]
3007    // same as test_read_structs but constructs projection mask via column names
3008    fn test_read_structs_by_name() {
3009        let testdata = arrow::util::test_util::parquet_test_data();
3010        let path = format!("{testdata}/nested_structs.rust.parquet");
3011        let file = File::open(&path).unwrap();
3012        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3013
3014        for batch in record_batch_reader {
3015            batch.unwrap();
3016        }
3017
3018        let file = File::open(&path).unwrap();
3019        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3020
3021        let mask = ProjectionMask::columns(
3022            builder.parquet_schema(),
3023            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3024        );
3025        let projected_reader = builder
3026            .with_projection(mask)
3027            .with_batch_size(60)
3028            .build()
3029            .unwrap();
3030
3031        let expected_schema = Schema::new(vec![
3032            Field::new(
3033                "roll_num",
3034                ArrowDataType::Struct(Fields::from(vec![Field::new(
3035                    "count",
3036                    ArrowDataType::UInt64,
3037                    false,
3038                )])),
3039                false,
3040            ),
3041            Field::new(
3042                "PC_CUR",
3043                ArrowDataType::Struct(Fields::from(vec![
3044                    Field::new("mean", ArrowDataType::Int64, false),
3045                    Field::new("sum", ArrowDataType::Int64, false),
3046                ])),
3047                false,
3048            ),
3049        ]);
3050
3051        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3052
3053        for batch in projected_reader {
3054            let batch = batch.unwrap();
3055            assert_eq!(batch.schema().as_ref(), &expected_schema);
3056        }
3057    }
3058
3059    #[test]
3060    fn test_read_maps() {
3061        let testdata = arrow::util::test_util::parquet_test_data();
3062        let path = format!("{testdata}/nested_maps.snappy.parquet");
3063        let file = File::open(path).unwrap();
3064        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3065
3066        for batch in record_batch_reader {
3067            batch.unwrap();
3068        }
3069    }
3070
3071    #[test]
3072    fn test_nested_nullability() {
3073        let message_type = "message nested {
3074          OPTIONAL Group group {
3075            REQUIRED INT32 leaf;
3076          }
3077        }";
3078
3079        let file = tempfile::tempfile().unwrap();
3080        let schema = Arc::new(parse_message_type(message_type).unwrap());
3081
3082        {
3083            // Write using low-level parquet API (#1167)
3084            let mut writer =
3085                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3086                    .unwrap();
3087
3088            {
3089                let mut row_group_writer = writer.next_row_group().unwrap();
3090                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3091
3092                column_writer
3093                    .typed::<Int32Type>()
3094                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3095                    .unwrap();
3096
3097                column_writer.close().unwrap();
3098                row_group_writer.close().unwrap();
3099            }
3100
3101            writer.close().unwrap();
3102        }
3103
3104        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3105        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3106
3107        let reader = builder.with_projection(mask).build().unwrap();
3108
3109        let expected_schema = Schema::new(Fields::from(vec![Field::new(
3110            "group",
3111            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3112            true,
3113        )]));
3114
3115        let batch = reader.into_iter().next().unwrap().unwrap();
3116        assert_eq!(batch.schema().as_ref(), &expected_schema);
3117        assert_eq!(batch.num_rows(), 4);
3118        assert_eq!(batch.column(0).null_count(), 2);
3119    }
3120
3121    #[test]
3122    fn test_invalid_utf8() {
3123        // a parquet file with 1 column with invalid utf8
3124        let data = vec![
3125            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3126            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3127            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3128            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3129            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3130            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3131            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3132            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3133            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3134            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3135            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3136            82, 49,
3137        ];
3138
3139        let file = Bytes::from(data);
3140        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3141
3142        let error = record_batch_reader.next().unwrap().unwrap_err();
3143
3144        assert!(
3145            error.to_string().contains("invalid utf-8 sequence"),
3146            "{}",
3147            error
3148        );
3149    }
3150
3151    #[test]
3152    fn test_invalid_utf8_string_array() {
3153        test_invalid_utf8_string_array_inner::<i32>();
3154    }
3155
3156    #[test]
3157    fn test_invalid_utf8_large_string_array() {
3158        test_invalid_utf8_string_array_inner::<i64>();
3159    }
3160
3161    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3162        let cases = [
3163            invalid_utf8_first_char::<O>(),
3164            invalid_utf8_first_char_long_strings::<O>(),
3165            invalid_utf8_later_char::<O>(),
3166            invalid_utf8_later_char_long_strings::<O>(),
3167            invalid_utf8_later_char_really_long_strings::<O>(),
3168            invalid_utf8_later_char_really_long_strings2::<O>(),
3169        ];
3170        for array in &cases {
3171            for encoding in STRING_ENCODINGS {
3172                // data is not valid utf8 we can not construct a correct StringArray
3173                // safely, so purposely create an invalid StringArray
3174                let array = unsafe {
3175                    GenericStringArray::<O>::new_unchecked(
3176                        array.offsets().clone(),
3177                        array.values().clone(),
3178                        array.nulls().cloned(),
3179                    )
3180                };
3181                let data_type = array.data_type().clone();
3182                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3183                let err = read_from_parquet(data).unwrap_err();
3184                let expected_err =
3185                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3186                assert!(
3187                    err.to_string().contains(expected_err),
3188                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3189                );
3190            }
3191        }
3192    }
3193
3194    #[test]
3195    fn test_invalid_utf8_string_view_array() {
3196        let cases = [
3197            invalid_utf8_first_char::<i32>(),
3198            invalid_utf8_first_char_long_strings::<i32>(),
3199            invalid_utf8_later_char::<i32>(),
3200            invalid_utf8_later_char_long_strings::<i32>(),
3201            invalid_utf8_later_char_really_long_strings::<i32>(),
3202            invalid_utf8_later_char_really_long_strings2::<i32>(),
3203        ];
3204
3205        for encoding in STRING_ENCODINGS {
3206            for array in &cases {
3207                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3208                let array = array.as_binary_view();
3209
3210                // data is not valid utf8 we can not construct a correct StringArray
3211                // safely, so purposely create an invalid StringViewArray
3212                let array = unsafe {
3213                    StringViewArray::new_unchecked(
3214                        array.views().clone(),
3215                        array.data_buffers().to_vec(),
3216                        array.nulls().cloned(),
3217                    )
3218                };
3219
3220                let data_type = array.data_type().clone();
3221                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3222                let err = read_from_parquet(data).unwrap_err();
3223                let expected_err =
3224                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3225                assert!(
3226                    err.to_string().contains(expected_err),
3227                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3228                );
3229            }
3230        }
3231    }
3232
3233    /// Encodings suitable for string data
3234    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3235        None,
3236        Some(Encoding::PLAIN),
3237        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3238        Some(Encoding::DELTA_BYTE_ARRAY),
3239    ];
3240
3241    /// Invalid Utf-8 sequence in the first character
3242    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3243    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3244
3245    /// Invalid Utf=8 sequence in NOT the first character
3246    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3247    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3248
3249    /// returns a BinaryArray with invalid UTF8 data in the first character
3250    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3251        let valid: &[u8] = b"   ";
3252        let invalid = INVALID_UTF8_FIRST_CHAR;
3253        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3254    }
3255
3256    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3257    /// string larger than 12 bytes which is handled specially when reading
3258    /// `ByteViewArray`s
3259    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3260        let valid: &[u8] = b"   ";
3261        let mut invalid = vec![];
3262        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3263        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3264        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3265    }
3266
3267    /// returns a BinaryArray with invalid UTF8 data in a character other than
3268    /// the first (this is checked in a special codepath)
3269    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3270        let valid: &[u8] = b"   ";
3271        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3272        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3273    }
3274
3275    /// returns a BinaryArray with invalid UTF8 data in a character other than
3276    /// the first in a string larger than 12 bytes which is handled specially
3277    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3278    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3279        let valid: &[u8] = b"   ";
3280        let mut invalid = vec![];
3281        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3282        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3283        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3284    }
3285
3286    /// returns a BinaryArray with invalid UTF8 data in a character other than
3287    /// the first in a string larger than 128 bytes which is handled specially
3288    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3289    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3290        let valid: &[u8] = b"   ";
3291        let mut invalid = vec![];
3292        for _ in 0..10 {
3293            // each instance is 38 bytes
3294            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3295        }
3296        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3297        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3298    }
3299
3300    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3301    /// invalid UTF8 data in a character other than the first in a string larger
3302    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3303        let valid: &[u8] = b"   ";
3304        let mut valid_long = vec![];
3305        for _ in 0..10 {
3306            // each instance is 38 bytes
3307            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3308        }
3309        let invalid = INVALID_UTF8_LATER_CHAR;
3310        GenericBinaryArray::<O>::from_iter(vec![
3311            None,
3312            Some(valid),
3313            Some(invalid),
3314            None,
3315            Some(&valid_long),
3316            Some(valid),
3317        ])
3318    }
3319
3320    /// writes the array into a single column parquet file with the specified
3321    /// encoding.
3322    ///
3323    /// If no encoding is specified, use default (dictionary) encoding
3324    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3325        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3326        let mut data = vec![];
3327        let schema = batch.schema();
3328        let props = encoding.map(|encoding| {
3329            WriterProperties::builder()
3330                // must disable dictionary encoding to actually use encoding
3331                .set_dictionary_enabled(false)
3332                .set_encoding(encoding)
3333                .build()
3334        });
3335
3336        {
3337            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3338            writer.write(&batch).unwrap();
3339            writer.flush().unwrap();
3340            writer.close().unwrap();
3341        };
3342        data
3343    }
3344
3345    /// read the parquet file into a record batch
3346    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3347        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3348            .unwrap()
3349            .build()
3350            .unwrap();
3351
3352        reader.collect()
3353    }
3354
3355    #[test]
3356    fn test_dictionary_preservation() {
3357        let fields = vec![Arc::new(
3358            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3359                .with_repetition(Repetition::OPTIONAL)
3360                .with_converted_type(ConvertedType::UTF8)
3361                .build()
3362                .unwrap(),
3363        )];
3364
3365        let schema = Arc::new(
3366            Type::group_type_builder("test_schema")
3367                .with_fields(fields)
3368                .build()
3369                .unwrap(),
3370        );
3371
3372        let dict_type = ArrowDataType::Dictionary(
3373            Box::new(ArrowDataType::Int32),
3374            Box::new(ArrowDataType::Utf8),
3375        );
3376
3377        let arrow_field = Field::new("leaf", dict_type, true);
3378
3379        let mut file = tempfile::tempfile().unwrap();
3380
3381        let values = vec![
3382            vec![
3383                ByteArray::from("hello"),
3384                ByteArray::from("a"),
3385                ByteArray::from("b"),
3386                ByteArray::from("d"),
3387            ],
3388            vec![
3389                ByteArray::from("c"),
3390                ByteArray::from("a"),
3391                ByteArray::from("b"),
3392            ],
3393        ];
3394
3395        let def_levels = vec![
3396            vec![1, 0, 0, 1, 0, 0, 1, 1],
3397            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3398        ];
3399
3400        let opts = TestOptions {
3401            encoding: Encoding::RLE_DICTIONARY,
3402            ..Default::default()
3403        };
3404
3405        generate_single_column_file_with_data::<ByteArrayType>(
3406            &values,
3407            Some(&def_levels),
3408            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3409            schema,
3410            Some(arrow_field),
3411            &opts,
3412        )
3413        .unwrap();
3414
3415        file.rewind().unwrap();
3416
3417        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3418
3419        let batches = record_reader
3420            .collect::<Result<Vec<RecordBatch>, _>>()
3421            .unwrap();
3422
3423        assert_eq!(batches.len(), 6);
3424        assert!(batches.iter().all(|x| x.num_columns() == 1));
3425
3426        let row_counts = batches
3427            .iter()
3428            .map(|x| (x.num_rows(), x.column(0).null_count()))
3429            .collect::<Vec<_>>();
3430
3431        assert_eq!(
3432            row_counts,
3433            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3434        );
3435
3436        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3437
3438        // First and second batch in same row group -> same dictionary
3439        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3440        // Third batch spans row group -> computed dictionary
3441        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3442        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3443        // Fourth, fifth and sixth from same row group -> same dictionary
3444        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3445        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3446    }
3447
3448    #[test]
3449    fn test_read_null_list() {
3450        let testdata = arrow::util::test_util::parquet_test_data();
3451        let path = format!("{testdata}/null_list.parquet");
3452        let file = File::open(path).unwrap();
3453        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3454
3455        let batch = record_batch_reader.next().unwrap().unwrap();
3456        assert_eq!(batch.num_rows(), 1);
3457        assert_eq!(batch.num_columns(), 1);
3458        assert_eq!(batch.column(0).len(), 1);
3459
3460        let list = batch
3461            .column(0)
3462            .as_any()
3463            .downcast_ref::<ListArray>()
3464            .unwrap();
3465        assert_eq!(list.len(), 1);
3466        assert!(list.is_valid(0));
3467
3468        let val = list.value(0);
3469        assert_eq!(val.len(), 0);
3470    }
3471
3472    #[test]
3473    fn test_null_schema_inference() {
3474        let testdata = arrow::util::test_util::parquet_test_data();
3475        let path = format!("{testdata}/null_list.parquet");
3476        let file = File::open(path).unwrap();
3477
3478        let arrow_field = Field::new(
3479            "emptylist",
3480            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3481            true,
3482        );
3483
3484        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3485        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3486        let schema = builder.schema();
3487        assert_eq!(schema.fields().len(), 1);
3488        assert_eq!(schema.field(0), &arrow_field);
3489    }
3490
3491    #[test]
3492    fn test_skip_metadata() {
3493        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3494        let field = Field::new("col", col.data_type().clone(), true);
3495
3496        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3497
3498        let metadata = [("key".to_string(), "value".to_string())]
3499            .into_iter()
3500            .collect();
3501
3502        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3503
3504        assert_ne!(schema_with_metadata, schema_without_metadata);
3505
3506        let batch =
3507            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3508
3509        let file = |version: WriterVersion| {
3510            let props = WriterProperties::builder()
3511                .set_writer_version(version)
3512                .build();
3513
3514            let file = tempfile().unwrap();
3515            let mut writer =
3516                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3517                    .unwrap();
3518            writer.write(&batch).unwrap();
3519            writer.close().unwrap();
3520            file
3521        };
3522
3523        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3524
3525        let v1_reader = file(WriterVersion::PARQUET_1_0);
3526        let v2_reader = file(WriterVersion::PARQUET_2_0);
3527
3528        let arrow_reader =
3529            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3530        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3531
3532        let reader =
3533            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3534                .unwrap()
3535                .build()
3536                .unwrap();
3537        assert_eq!(reader.schema(), schema_without_metadata);
3538
3539        let arrow_reader =
3540            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3541        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3542
3543        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3544            .unwrap()
3545            .build()
3546            .unwrap();
3547        assert_eq!(reader.schema(), schema_without_metadata);
3548    }
3549
3550    fn write_parquet_from_iter<I, F>(value: I) -> File
3551    where
3552        I: IntoIterator<Item = (F, ArrayRef)>,
3553        F: AsRef<str>,
3554    {
3555        let batch = RecordBatch::try_from_iter(value).unwrap();
3556        let file = tempfile().unwrap();
3557        let mut writer =
3558            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3559        writer.write(&batch).unwrap();
3560        writer.close().unwrap();
3561        file
3562    }
3563
3564    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3565    where
3566        I: IntoIterator<Item = (F, ArrayRef)>,
3567        F: AsRef<str>,
3568    {
3569        let file = write_parquet_from_iter(value);
3570        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3571        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3572            file.try_clone().unwrap(),
3573            options_with_schema,
3574        );
3575        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3576    }
3577
3578    #[test]
3579    fn test_schema_too_few_columns() {
3580        run_schema_test_with_error(
3581            vec![
3582                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3583                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3584            ],
3585            Arc::new(Schema::new(vec![Field::new(
3586                "int64",
3587                ArrowDataType::Int64,
3588                false,
3589            )])),
3590            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3591        );
3592    }
3593
3594    #[test]
3595    fn test_schema_too_many_columns() {
3596        run_schema_test_with_error(
3597            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3598            Arc::new(Schema::new(vec![
3599                Field::new("int64", ArrowDataType::Int64, false),
3600                Field::new("int32", ArrowDataType::Int32, false),
3601            ])),
3602            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3603        );
3604    }
3605
3606    #[test]
3607    fn test_schema_mismatched_column_names() {
3608        run_schema_test_with_error(
3609            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3610            Arc::new(Schema::new(vec![Field::new(
3611                "other",
3612                ArrowDataType::Int64,
3613                false,
3614            )])),
3615            "Arrow: incompatible arrow schema, expected field named int64 got other",
3616        );
3617    }
3618
3619    #[test]
3620    fn test_schema_incompatible_columns() {
3621        run_schema_test_with_error(
3622            vec![
3623                (
3624                    "col1_invalid",
3625                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3626                ),
3627                (
3628                    "col2_valid",
3629                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3630                ),
3631                (
3632                    "col3_invalid",
3633                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3634                ),
3635            ],
3636            Arc::new(Schema::new(vec![
3637                Field::new("col1_invalid", ArrowDataType::Int32, false),
3638                Field::new("col2_valid", ArrowDataType::Int32, false),
3639                Field::new("col3_invalid", ArrowDataType::Int32, false),
3640            ])),
3641            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3642        );
3643    }
3644
3645    #[test]
3646    fn test_one_incompatible_nested_column() {
3647        let nested_fields = Fields::from(vec![
3648            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3649            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3650        ]);
3651        let nested = StructArray::try_new(
3652            nested_fields,
3653            vec![
3654                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3655                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3656            ],
3657            None,
3658        )
3659        .expect("struct array");
3660        let supplied_nested_fields = Fields::from(vec![
3661            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3662            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3663        ]);
3664        run_schema_test_with_error(
3665            vec![
3666                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3667                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3668                ("nested", Arc::new(nested) as ArrayRef),
3669            ],
3670            Arc::new(Schema::new(vec![
3671                Field::new("col1", ArrowDataType::Int64, false),
3672                Field::new("col2", ArrowDataType::Int32, false),
3673                Field::new(
3674                    "nested",
3675                    ArrowDataType::Struct(supplied_nested_fields),
3676                    false,
3677                ),
3678            ])),
3679            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3680            requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3681            but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3682        );
3683    }
3684
3685    /// Return parquet data with a single column of utf8 strings
3686    fn utf8_parquet() -> Bytes {
3687        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3688        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3689        let props = None;
3690        // write parquet file with non nullable strings
3691        let mut parquet_data = vec![];
3692        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3693        writer.write(&batch).unwrap();
3694        writer.close().unwrap();
3695        Bytes::from(parquet_data)
3696    }
3697
3698    #[test]
3699    fn test_schema_error_bad_types() {
3700        // verify incompatible schemas error on read
3701        let parquet_data = utf8_parquet();
3702
3703        // Ask to read it back with an incompatible schema (int vs string)
3704        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3705            "column1",
3706            arrow::datatypes::DataType::Int32,
3707            false,
3708        )]));
3709
3710        // read it back out
3711        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3712        let err =
3713            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3714                .unwrap_err();
3715        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3716    }
3717
3718    #[test]
3719    fn test_schema_error_bad_nullability() {
3720        // verify incompatible schemas error on read
3721        let parquet_data = utf8_parquet();
3722
3723        // Ask to read it back with an incompatible schema (nullability mismatch)
3724        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3725            "column1",
3726            arrow::datatypes::DataType::Utf8,
3727            true,
3728        )]));
3729
3730        // read it back out
3731        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3732        let err =
3733            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3734                .unwrap_err();
3735        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3736    }
3737
3738    #[test]
3739    fn test_read_binary_as_utf8() {
3740        let file = write_parquet_from_iter(vec![
3741            (
3742                "binary_to_utf8",
3743                Arc::new(BinaryArray::from(vec![
3744                    b"one".as_ref(),
3745                    b"two".as_ref(),
3746                    b"three".as_ref(),
3747                ])) as ArrayRef,
3748            ),
3749            (
3750                "large_binary_to_large_utf8",
3751                Arc::new(LargeBinaryArray::from(vec![
3752                    b"one".as_ref(),
3753                    b"two".as_ref(),
3754                    b"three".as_ref(),
3755                ])) as ArrayRef,
3756            ),
3757            (
3758                "binary_view_to_utf8_view",
3759                Arc::new(BinaryViewArray::from(vec![
3760                    b"one".as_ref(),
3761                    b"two".as_ref(),
3762                    b"three".as_ref(),
3763                ])) as ArrayRef,
3764            ),
3765        ]);
3766        let supplied_fields = Fields::from(vec![
3767            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3768            Field::new(
3769                "large_binary_to_large_utf8",
3770                ArrowDataType::LargeUtf8,
3771                false,
3772            ),
3773            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3774        ]);
3775
3776        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3777        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3778            file.try_clone().unwrap(),
3779            options,
3780        )
3781        .expect("reader builder with schema")
3782        .build()
3783        .expect("reader with schema");
3784
3785        let batch = arrow_reader.next().unwrap().unwrap();
3786        assert_eq!(batch.num_columns(), 3);
3787        assert_eq!(batch.num_rows(), 3);
3788        assert_eq!(
3789            batch
3790                .column(0)
3791                .as_string::<i32>()
3792                .iter()
3793                .collect::<Vec<_>>(),
3794            vec![Some("one"), Some("two"), Some("three")]
3795        );
3796
3797        assert_eq!(
3798            batch
3799                .column(1)
3800                .as_string::<i64>()
3801                .iter()
3802                .collect::<Vec<_>>(),
3803            vec![Some("one"), Some("two"), Some("three")]
3804        );
3805
3806        assert_eq!(
3807            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3808            vec![Some("one"), Some("two"), Some("three")]
3809        );
3810    }
3811
3812    #[test]
3813    #[should_panic(expected = "Invalid UTF8 sequence at")]
3814    fn test_read_non_utf8_binary_as_utf8() {
3815        let file = write_parquet_from_iter(vec![(
3816            "non_utf8_binary",
3817            Arc::new(BinaryArray::from(vec![
3818                b"\xDE\x00\xFF".as_ref(),
3819                b"\xDE\x01\xAA".as_ref(),
3820                b"\xDE\x02\xFF".as_ref(),
3821            ])) as ArrayRef,
3822        )]);
3823        let supplied_fields = Fields::from(vec![Field::new(
3824            "non_utf8_binary",
3825            ArrowDataType::Utf8,
3826            false,
3827        )]);
3828
3829        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3830        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3831            file.try_clone().unwrap(),
3832            options,
3833        )
3834        .expect("reader builder with schema")
3835        .build()
3836        .expect("reader with schema");
3837        arrow_reader.next().unwrap().unwrap_err();
3838    }
3839
3840    #[test]
3841    fn test_with_schema() {
3842        let nested_fields = Fields::from(vec![
3843            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3844            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3845        ]);
3846
3847        let nested_arrays: Vec<ArrayRef> = vec![
3848            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3849            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3850        ];
3851
3852        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3853
3854        let file = write_parquet_from_iter(vec![
3855            (
3856                "int32_to_ts_second",
3857                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3858            ),
3859            (
3860                "date32_to_date64",
3861                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3862            ),
3863            ("nested", Arc::new(nested) as ArrayRef),
3864        ]);
3865
3866        let supplied_nested_fields = Fields::from(vec![
3867            Field::new(
3868                "utf8_to_dict",
3869                ArrowDataType::Dictionary(
3870                    Box::new(ArrowDataType::Int32),
3871                    Box::new(ArrowDataType::Utf8),
3872                ),
3873                false,
3874            ),
3875            Field::new(
3876                "int64_to_ts_nano",
3877                ArrowDataType::Timestamp(
3878                    arrow::datatypes::TimeUnit::Nanosecond,
3879                    Some("+10:00".into()),
3880                ),
3881                false,
3882            ),
3883        ]);
3884
3885        let supplied_schema = Arc::new(Schema::new(vec![
3886            Field::new(
3887                "int32_to_ts_second",
3888                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3889                false,
3890            ),
3891            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3892            Field::new(
3893                "nested",
3894                ArrowDataType::Struct(supplied_nested_fields),
3895                false,
3896            ),
3897        ]));
3898
3899        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3900        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3901            file.try_clone().unwrap(),
3902            options,
3903        )
3904        .expect("reader builder with schema")
3905        .build()
3906        .expect("reader with schema");
3907
3908        assert_eq!(arrow_reader.schema(), supplied_schema);
3909        let batch = arrow_reader.next().unwrap().unwrap();
3910        assert_eq!(batch.num_columns(), 3);
3911        assert_eq!(batch.num_rows(), 4);
3912        assert_eq!(
3913            batch
3914                .column(0)
3915                .as_any()
3916                .downcast_ref::<TimestampSecondArray>()
3917                .expect("downcast to timestamp second")
3918                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3919                .map(|v| v.to_string())
3920                .expect("value as datetime"),
3921            "1970-01-01 01:00:00 +01:00"
3922        );
3923        assert_eq!(
3924            batch
3925                .column(1)
3926                .as_any()
3927                .downcast_ref::<Date64Array>()
3928                .expect("downcast to date64")
3929                .value_as_date(0)
3930                .map(|v| v.to_string())
3931                .expect("value as date"),
3932            "1970-01-01"
3933        );
3934
3935        let nested = batch
3936            .column(2)
3937            .as_any()
3938            .downcast_ref::<StructArray>()
3939            .expect("downcast to struct");
3940
3941        let nested_dict = nested
3942            .column(0)
3943            .as_any()
3944            .downcast_ref::<Int32DictionaryArray>()
3945            .expect("downcast to dictionary");
3946
3947        assert_eq!(
3948            nested_dict
3949                .values()
3950                .as_any()
3951                .downcast_ref::<StringArray>()
3952                .expect("downcast to string")
3953                .iter()
3954                .collect::<Vec<_>>(),
3955            vec![Some("a"), Some("b")]
3956        );
3957
3958        assert_eq!(
3959            nested_dict.keys().iter().collect::<Vec<_>>(),
3960            vec![Some(0), Some(0), Some(0), Some(1)]
3961        );
3962
3963        assert_eq!(
3964            nested
3965                .column(1)
3966                .as_any()
3967                .downcast_ref::<TimestampNanosecondArray>()
3968                .expect("downcast to timestamp nanosecond")
3969                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3970                .map(|v| v.to_string())
3971                .expect("value as datetime"),
3972            "1970-01-01 10:00:00.000000001 +10:00"
3973        );
3974    }
3975
3976    #[test]
3977    fn test_empty_projection() {
3978        let testdata = arrow::util::test_util::parquet_test_data();
3979        let path = format!("{testdata}/alltypes_plain.parquet");
3980        let file = File::open(path).unwrap();
3981
3982        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3983        let file_metadata = builder.metadata().file_metadata();
3984        let expected_rows = file_metadata.num_rows() as usize;
3985
3986        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3987        let batch_reader = builder
3988            .with_projection(mask)
3989            .with_batch_size(2)
3990            .build()
3991            .unwrap();
3992
3993        let mut total_rows = 0;
3994        for maybe_batch in batch_reader {
3995            let batch = maybe_batch.unwrap();
3996            total_rows += batch.num_rows();
3997            assert_eq!(batch.num_columns(), 0);
3998            assert!(batch.num_rows() <= 2);
3999        }
4000
4001        assert_eq!(total_rows, expected_rows);
4002    }
4003
4004    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4005        let schema = Arc::new(Schema::new(vec![Field::new(
4006            "list",
4007            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4008            true,
4009        )]));
4010
4011        let mut buf = Vec::with_capacity(1024);
4012
4013        let mut writer = ArrowWriter::try_new(
4014            &mut buf,
4015            schema.clone(),
4016            Some(
4017                WriterProperties::builder()
4018                    .set_max_row_group_size(row_group_size)
4019                    .build(),
4020            ),
4021        )
4022        .unwrap();
4023        for _ in 0..2 {
4024            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4025            for _ in 0..(batch_size) {
4026                list_builder.append(true);
4027            }
4028            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4029                .unwrap();
4030            writer.write(&batch).unwrap();
4031        }
4032        writer.close().unwrap();
4033
4034        let mut record_reader =
4035            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4036        assert_eq!(
4037            batch_size,
4038            record_reader.next().unwrap().unwrap().num_rows()
4039        );
4040        assert_eq!(
4041            batch_size,
4042            record_reader.next().unwrap().unwrap().num_rows()
4043        );
4044    }
4045
4046    #[test]
4047    fn test_row_group_exact_multiple() {
4048        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4049        test_row_group_batch(8, 8);
4050        test_row_group_batch(10, 8);
4051        test_row_group_batch(8, 10);
4052        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4053        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4054        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4055        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4056        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4057    }
4058
4059    /// Given a RecordBatch containing all the column data, return the expected batches given
4060    /// a `batch_size` and `selection`
4061    fn get_expected_batches(
4062        column: &RecordBatch,
4063        selection: &RowSelection,
4064        batch_size: usize,
4065    ) -> Vec<RecordBatch> {
4066        let mut expected_batches = vec![];
4067
4068        let mut selection: VecDeque<_> = selection.clone().into();
4069        let mut row_offset = 0;
4070        let mut last_start = None;
4071        while row_offset < column.num_rows() && !selection.is_empty() {
4072            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4073            while batch_remaining > 0 && !selection.is_empty() {
4074                let (to_read, skip) = match selection.front_mut() {
4075                    Some(selection) if selection.row_count > batch_remaining => {
4076                        selection.row_count -= batch_remaining;
4077                        (batch_remaining, selection.skip)
4078                    }
4079                    Some(_) => {
4080                        let select = selection.pop_front().unwrap();
4081                        (select.row_count, select.skip)
4082                    }
4083                    None => break,
4084                };
4085
4086                batch_remaining -= to_read;
4087
4088                match skip {
4089                    true => {
4090                        if let Some(last_start) = last_start.take() {
4091                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4092                        }
4093                        row_offset += to_read
4094                    }
4095                    false => {
4096                        last_start.get_or_insert(row_offset);
4097                        row_offset += to_read
4098                    }
4099                }
4100            }
4101        }
4102
4103        if let Some(last_start) = last_start.take() {
4104            expected_batches.push(column.slice(last_start, row_offset - last_start))
4105        }
4106
4107        // Sanity check, all batches except the final should be the batch size
4108        for batch in &expected_batches[..expected_batches.len() - 1] {
4109            assert_eq!(batch.num_rows(), batch_size);
4110        }
4111
4112        expected_batches
4113    }
4114
4115    fn create_test_selection(
4116        step_len: usize,
4117        total_len: usize,
4118        skip_first: bool,
4119    ) -> (RowSelection, usize) {
4120        let mut remaining = total_len;
4121        let mut skip = skip_first;
4122        let mut vec = vec![];
4123        let mut selected_count = 0;
4124        while remaining != 0 {
4125            let step = if remaining > step_len {
4126                step_len
4127            } else {
4128                remaining
4129            };
4130            vec.push(RowSelector {
4131                row_count: step,
4132                skip,
4133            });
4134            remaining -= step;
4135            if !skip {
4136                selected_count += step;
4137            }
4138            skip = !skip;
4139        }
4140        (vec.into(), selected_count)
4141    }
4142
4143    #[test]
4144    fn test_scan_row_with_selection() {
4145        let testdata = arrow::util::test_util::parquet_test_data();
4146        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4147        let test_file = File::open(&path).unwrap();
4148
4149        let mut serial_reader =
4150            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4151        let data = serial_reader.next().unwrap().unwrap();
4152
4153        let do_test = |batch_size: usize, selection_len: usize| {
4154            for skip_first in [false, true] {
4155                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4156
4157                let expected = get_expected_batches(&data, &selections, batch_size);
4158                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4159                assert_eq!(
4160                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4161                    expected,
4162                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4163                );
4164            }
4165        };
4166
4167        // total row count 7300
4168        // 1. test selection len more than one page row count
4169        do_test(1000, 1000);
4170
4171        // 2. test selection len less than one page row count
4172        do_test(20, 20);
4173
4174        // 3. test selection_len less than batch_size
4175        do_test(20, 5);
4176
4177        // 4. test selection_len more than batch_size
4178        // If batch_size < selection_len
4179        do_test(20, 5);
4180
4181        fn create_skip_reader(
4182            test_file: &File,
4183            batch_size: usize,
4184            selections: RowSelection,
4185        ) -> ParquetRecordBatchReader {
4186            let options = ArrowReaderOptions::new().with_page_index(true);
4187            let file = test_file.try_clone().unwrap();
4188            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4189                .unwrap()
4190                .with_batch_size(batch_size)
4191                .with_row_selection(selections)
4192                .build()
4193                .unwrap()
4194        }
4195    }
4196
4197    #[test]
4198    fn test_batch_size_overallocate() {
4199        let testdata = arrow::util::test_util::parquet_test_data();
4200        // `alltypes_plain.parquet` only have 8 rows
4201        let path = format!("{testdata}/alltypes_plain.parquet");
4202        let test_file = File::open(path).unwrap();
4203
4204        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4205        let num_rows = builder.metadata.file_metadata().num_rows();
4206        let reader = builder
4207            .with_batch_size(1024)
4208            .with_projection(ProjectionMask::all())
4209            .build()
4210            .unwrap();
4211        assert_ne!(1024, num_rows);
4212        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4213    }
4214
4215    #[test]
4216    fn test_read_with_page_index_enabled() {
4217        let testdata = arrow::util::test_util::parquet_test_data();
4218
4219        {
4220            // `alltypes_tiny_pages.parquet` has page index
4221            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4222            let test_file = File::open(path).unwrap();
4223            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4224                test_file,
4225                ArrowReaderOptions::new().with_page_index(true),
4226            )
4227            .unwrap();
4228            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4229            let reader = builder.build().unwrap();
4230            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4231            assert_eq!(batches.len(), 8);
4232        }
4233
4234        {
4235            // `alltypes_plain.parquet` doesn't have page index
4236            let path = format!("{testdata}/alltypes_plain.parquet");
4237            let test_file = File::open(path).unwrap();
4238            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4239                test_file,
4240                ArrowReaderOptions::new().with_page_index(true),
4241            )
4242            .unwrap();
4243            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4244            // we should read the file successfully.
4245            assert!(builder.metadata().offset_index().is_none());
4246            let reader = builder.build().unwrap();
4247            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4248            assert_eq!(batches.len(), 1);
4249        }
4250    }
4251
4252    #[test]
4253    fn test_raw_repetition() {
4254        const MESSAGE_TYPE: &str = "
4255            message Log {
4256              OPTIONAL INT32 eventType;
4257              REPEATED INT32 category;
4258              REPEATED group filter {
4259                OPTIONAL INT32 error;
4260              }
4261            }
4262        ";
4263        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4264        let props = Default::default();
4265
4266        let mut buf = Vec::with_capacity(1024);
4267        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4268        let mut row_group_writer = writer.next_row_group().unwrap();
4269
4270        // column 0
4271        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4272        col_writer
4273            .typed::<Int32Type>()
4274            .write_batch(&[1], Some(&[1]), None)
4275            .unwrap();
4276        col_writer.close().unwrap();
4277        // column 1
4278        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4279        col_writer
4280            .typed::<Int32Type>()
4281            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4282            .unwrap();
4283        col_writer.close().unwrap();
4284        // column 2
4285        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4286        col_writer
4287            .typed::<Int32Type>()
4288            .write_batch(&[1], Some(&[1]), Some(&[0]))
4289            .unwrap();
4290        col_writer.close().unwrap();
4291
4292        let rg_md = row_group_writer.close().unwrap();
4293        assert_eq!(rg_md.num_rows(), 1);
4294        writer.close().unwrap();
4295
4296        let bytes = Bytes::from(buf);
4297
4298        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4299        let full = no_mask.next().unwrap().unwrap();
4300
4301        assert_eq!(full.num_columns(), 3);
4302
4303        for idx in 0..3 {
4304            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4305            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4306            let mut reader = b.with_projection(mask).build().unwrap();
4307            let projected = reader.next().unwrap().unwrap();
4308
4309            assert_eq!(projected.num_columns(), 1);
4310            assert_eq!(full.column(idx), projected.column(0));
4311        }
4312    }
4313
4314    #[test]
4315    fn test_read_lz4_raw() {
4316        let testdata = arrow::util::test_util::parquet_test_data();
4317        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4318        let file = File::open(path).unwrap();
4319
4320        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4321            .unwrap()
4322            .collect::<Result<Vec<_>, _>>()
4323            .unwrap();
4324        assert_eq!(batches.len(), 1);
4325        let batch = &batches[0];
4326
4327        assert_eq!(batch.num_columns(), 3);
4328        assert_eq!(batch.num_rows(), 4);
4329
4330        // https://github.com/apache/parquet-testing/pull/18
4331        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4332        assert_eq!(
4333            a.values(),
4334            &[1593604800, 1593604800, 1593604801, 1593604801]
4335        );
4336
4337        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4338        let a: Vec<_> = a.iter().flatten().collect();
4339        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4340
4341        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4342        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4343    }
4344
4345    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4346    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4347    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4348    //    LZ4_HADOOP algorithm for compression.
4349    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4350    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4351    //    older parquet-cpp versions.
4352    //
4353    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4354    #[test]
4355    fn test_read_lz4_hadoop_fallback() {
4356        for file in [
4357            "hadoop_lz4_compressed.parquet",
4358            "non_hadoop_lz4_compressed.parquet",
4359        ] {
4360            let testdata = arrow::util::test_util::parquet_test_data();
4361            let path = format!("{testdata}/{file}");
4362            let file = File::open(path).unwrap();
4363            let expected_rows = 4;
4364
4365            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4366                .unwrap()
4367                .collect::<Result<Vec<_>, _>>()
4368                .unwrap();
4369            assert_eq!(batches.len(), 1);
4370            let batch = &batches[0];
4371
4372            assert_eq!(batch.num_columns(), 3);
4373            assert_eq!(batch.num_rows(), expected_rows);
4374
4375            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4376            assert_eq!(
4377                a.values(),
4378                &[1593604800, 1593604800, 1593604801, 1593604801]
4379            );
4380
4381            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4382            let b: Vec<_> = b.iter().flatten().collect();
4383            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4384
4385            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4386            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4387        }
4388    }
4389
4390    #[test]
4391    fn test_read_lz4_hadoop_large() {
4392        let testdata = arrow::util::test_util::parquet_test_data();
4393        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4394        let file = File::open(path).unwrap();
4395        let expected_rows = 10000;
4396
4397        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4398            .unwrap()
4399            .collect::<Result<Vec<_>, _>>()
4400            .unwrap();
4401        assert_eq!(batches.len(), 1);
4402        let batch = &batches[0];
4403
4404        assert_eq!(batch.num_columns(), 1);
4405        assert_eq!(batch.num_rows(), expected_rows);
4406
4407        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4408        let a: Vec<_> = a.iter().flatten().collect();
4409        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4410        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4411        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4412        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4413    }
4414
4415    #[test]
4416    #[cfg(feature = "snap")]
4417    fn test_read_nested_lists() {
4418        let testdata = arrow::util::test_util::parquet_test_data();
4419        let path = format!("{testdata}/nested_lists.snappy.parquet");
4420        let file = File::open(path).unwrap();
4421
4422        let f = file.try_clone().unwrap();
4423        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4424        let expected = reader.next().unwrap().unwrap();
4425        assert_eq!(expected.num_rows(), 3);
4426
4427        let selection = RowSelection::from(vec![
4428            RowSelector::skip(1),
4429            RowSelector::select(1),
4430            RowSelector::skip(1),
4431        ]);
4432        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4433            .unwrap()
4434            .with_row_selection(selection)
4435            .build()
4436            .unwrap();
4437
4438        let actual = reader.next().unwrap().unwrap();
4439        assert_eq!(actual.num_rows(), 1);
4440        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4441    }
4442
4443    #[test]
4444    fn test_arbitrary_decimal() {
4445        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4446        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4447            .with_precision_and_scale(19, 0)
4448            .unwrap();
4449        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4450            .with_precision_and_scale(12, 0)
4451            .unwrap();
4452        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4453            .with_precision_and_scale(17, 10)
4454            .unwrap();
4455
4456        let written = RecordBatch::try_from_iter([
4457            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4458            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4459            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4460        ])
4461        .unwrap();
4462
4463        let mut buffer = Vec::with_capacity(1024);
4464        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4465        writer.write(&written).unwrap();
4466        writer.close().unwrap();
4467
4468        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4469            .unwrap()
4470            .collect::<Result<Vec<_>, _>>()
4471            .unwrap();
4472
4473        assert_eq!(&written.slice(0, 8), &read[0]);
4474    }
4475
4476    #[test]
4477    fn test_list_skip() {
4478        let mut list = ListBuilder::new(Int32Builder::new());
4479        list.append_value([Some(1), Some(2)]);
4480        list.append_value([Some(3)]);
4481        list.append_value([Some(4)]);
4482        let list = list.finish();
4483        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4484
4485        // First page contains 2 values but only 1 row
4486        let props = WriterProperties::builder()
4487            .set_data_page_row_count_limit(1)
4488            .set_write_batch_size(2)
4489            .build();
4490
4491        let mut buffer = Vec::with_capacity(1024);
4492        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4493        writer.write(&batch).unwrap();
4494        writer.close().unwrap();
4495
4496        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4497        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4498            .unwrap()
4499            .with_row_selection(selection.into())
4500            .build()
4501            .unwrap();
4502        let out = reader.next().unwrap().unwrap();
4503        assert_eq!(out.num_rows(), 1);
4504        assert_eq!(out, batch.slice(2, 1));
4505    }
4506
4507    fn test_decimal32_roundtrip() {
4508        let d = |values: Vec<i32>, p: u8| {
4509            let iter = values.into_iter();
4510            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4511                .with_precision_and_scale(p, 2)
4512                .unwrap()
4513        };
4514
4515        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4516        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4517
4518        let mut buffer = Vec::with_capacity(1024);
4519        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4520        writer.write(&batch).unwrap();
4521        writer.close().unwrap();
4522
4523        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4524        let t1 = builder.parquet_schema().columns()[0].physical_type();
4525        assert_eq!(t1, PhysicalType::INT32);
4526
4527        let mut reader = builder.build().unwrap();
4528        assert_eq!(batch.schema(), reader.schema());
4529
4530        let out = reader.next().unwrap().unwrap();
4531        assert_eq!(batch, out);
4532    }
4533
4534    fn test_decimal64_roundtrip() {
4535        // Precision <= 9 -> INT32
4536        // Precision <= 18 -> INT64
4537
4538        let d = |values: Vec<i64>, p: u8| {
4539            let iter = values.into_iter();
4540            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4541                .with_precision_and_scale(p, 2)
4542                .unwrap()
4543        };
4544
4545        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4546        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4547        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4548
4549        let batch = RecordBatch::try_from_iter([
4550            ("d1", Arc::new(d1) as ArrayRef),
4551            ("d2", Arc::new(d2) as ArrayRef),
4552            ("d3", Arc::new(d3) as ArrayRef),
4553        ])
4554        .unwrap();
4555
4556        let mut buffer = Vec::with_capacity(1024);
4557        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4558        writer.write(&batch).unwrap();
4559        writer.close().unwrap();
4560
4561        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4562        let t1 = builder.parquet_schema().columns()[0].physical_type();
4563        assert_eq!(t1, PhysicalType::INT32);
4564        let t2 = builder.parquet_schema().columns()[1].physical_type();
4565        assert_eq!(t2, PhysicalType::INT64);
4566        let t3 = builder.parquet_schema().columns()[2].physical_type();
4567        assert_eq!(t3, PhysicalType::INT64);
4568
4569        let mut reader = builder.build().unwrap();
4570        assert_eq!(batch.schema(), reader.schema());
4571
4572        let out = reader.next().unwrap().unwrap();
4573        assert_eq!(batch, out);
4574    }
4575
4576    fn test_decimal_roundtrip<T: DecimalType>() {
4577        // Precision <= 9 -> INT32
4578        // Precision <= 18 -> INT64
4579        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4580
4581        let d = |values: Vec<usize>, p: u8| {
4582            let iter = values.into_iter().map(T::Native::usize_as);
4583            PrimitiveArray::<T>::from_iter_values(iter)
4584                .with_precision_and_scale(p, 2)
4585                .unwrap()
4586        };
4587
4588        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4589        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4590        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4591        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4592
4593        let batch = RecordBatch::try_from_iter([
4594            ("d1", Arc::new(d1) as ArrayRef),
4595            ("d2", Arc::new(d2) as ArrayRef),
4596            ("d3", Arc::new(d3) as ArrayRef),
4597            ("d4", Arc::new(d4) as ArrayRef),
4598        ])
4599        .unwrap();
4600
4601        let mut buffer = Vec::with_capacity(1024);
4602        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4603        writer.write(&batch).unwrap();
4604        writer.close().unwrap();
4605
4606        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4607        let t1 = builder.parquet_schema().columns()[0].physical_type();
4608        assert_eq!(t1, PhysicalType::INT32);
4609        let t2 = builder.parquet_schema().columns()[1].physical_type();
4610        assert_eq!(t2, PhysicalType::INT64);
4611        let t3 = builder.parquet_schema().columns()[2].physical_type();
4612        assert_eq!(t3, PhysicalType::INT64);
4613        let t4 = builder.parquet_schema().columns()[3].physical_type();
4614        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4615
4616        let mut reader = builder.build().unwrap();
4617        assert_eq!(batch.schema(), reader.schema());
4618
4619        let out = reader.next().unwrap().unwrap();
4620        assert_eq!(batch, out);
4621    }
4622
4623    #[test]
4624    fn test_decimal() {
4625        test_decimal32_roundtrip();
4626        test_decimal64_roundtrip();
4627        test_decimal_roundtrip::<Decimal128Type>();
4628        test_decimal_roundtrip::<Decimal256Type>();
4629    }
4630
4631    #[test]
4632    fn test_list_selection() {
4633        let schema = Arc::new(Schema::new(vec![Field::new_list(
4634            "list",
4635            Field::new_list_field(ArrowDataType::Utf8, true),
4636            false,
4637        )]));
4638        let mut buf = Vec::with_capacity(1024);
4639
4640        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4641
4642        for i in 0..2 {
4643            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4644            for j in 0..1024 {
4645                list_a_builder.values().append_value(format!("{i} {j}"));
4646                list_a_builder.append(true);
4647            }
4648            let batch =
4649                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4650                    .unwrap();
4651            writer.write(&batch).unwrap();
4652        }
4653        let _metadata = writer.close().unwrap();
4654
4655        let buf = Bytes::from(buf);
4656        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4657            .unwrap()
4658            .with_row_selection(RowSelection::from(vec![
4659                RowSelector::skip(100),
4660                RowSelector::select(924),
4661                RowSelector::skip(100),
4662                RowSelector::select(924),
4663            ]))
4664            .build()
4665            .unwrap();
4666
4667        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4668        let batch = concat_batches(&schema, &batches).unwrap();
4669
4670        assert_eq!(batch.num_rows(), 924 * 2);
4671        let list = batch.column(0).as_list::<i32>();
4672
4673        for w in list.value_offsets().windows(2) {
4674            assert_eq!(w[0] + 1, w[1])
4675        }
4676        let mut values = list.values().as_string::<i32>().iter();
4677
4678        for i in 0..2 {
4679            for j in 100..1024 {
4680                let expected = format!("{i} {j}");
4681                assert_eq!(values.next().unwrap().unwrap(), &expected);
4682            }
4683        }
4684    }
4685
4686    #[test]
4687    fn test_list_selection_fuzz() {
4688        let mut rng = rng();
4689        let schema = Arc::new(Schema::new(vec![Field::new_list(
4690            "list",
4691            Field::new_list(
4692                Field::LIST_FIELD_DEFAULT_NAME,
4693                Field::new_list_field(ArrowDataType::Int32, true),
4694                true,
4695            ),
4696            true,
4697        )]));
4698        let mut buf = Vec::with_capacity(1024);
4699        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4700
4701        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4702
4703        for _ in 0..2048 {
4704            if rng.random_bool(0.2) {
4705                list_a_builder.append(false);
4706                continue;
4707            }
4708
4709            let list_a_len = rng.random_range(0..10);
4710            let list_b_builder = list_a_builder.values();
4711
4712            for _ in 0..list_a_len {
4713                if rng.random_bool(0.2) {
4714                    list_b_builder.append(false);
4715                    continue;
4716                }
4717
4718                let list_b_len = rng.random_range(0..10);
4719                let int_builder = list_b_builder.values();
4720                for _ in 0..list_b_len {
4721                    match rng.random_bool(0.2) {
4722                        true => int_builder.append_null(),
4723                        false => int_builder.append_value(rng.random()),
4724                    }
4725                }
4726                list_b_builder.append(true)
4727            }
4728            list_a_builder.append(true);
4729        }
4730
4731        let array = Arc::new(list_a_builder.finish());
4732        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4733
4734        writer.write(&batch).unwrap();
4735        let _metadata = writer.close().unwrap();
4736
4737        let buf = Bytes::from(buf);
4738
4739        let cases = [
4740            vec![
4741                RowSelector::skip(100),
4742                RowSelector::select(924),
4743                RowSelector::skip(100),
4744                RowSelector::select(924),
4745            ],
4746            vec![
4747                RowSelector::select(924),
4748                RowSelector::skip(100),
4749                RowSelector::select(924),
4750                RowSelector::skip(100),
4751            ],
4752            vec![
4753                RowSelector::skip(1023),
4754                RowSelector::select(1),
4755                RowSelector::skip(1023),
4756                RowSelector::select(1),
4757            ],
4758            vec![
4759                RowSelector::select(1),
4760                RowSelector::skip(1023),
4761                RowSelector::select(1),
4762                RowSelector::skip(1023),
4763            ],
4764        ];
4765
4766        for batch_size in [100, 1024, 2048] {
4767            for selection in &cases {
4768                let selection = RowSelection::from(selection.clone());
4769                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4770                    .unwrap()
4771                    .with_row_selection(selection.clone())
4772                    .with_batch_size(batch_size)
4773                    .build()
4774                    .unwrap();
4775
4776                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4777                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4778                assert_eq!(actual.num_rows(), selection.row_count());
4779
4780                let mut batch_offset = 0;
4781                let mut actual_offset = 0;
4782                for selector in selection.iter() {
4783                    if selector.skip {
4784                        batch_offset += selector.row_count;
4785                        continue;
4786                    }
4787
4788                    assert_eq!(
4789                        batch.slice(batch_offset, selector.row_count),
4790                        actual.slice(actual_offset, selector.row_count)
4791                    );
4792
4793                    batch_offset += selector.row_count;
4794                    actual_offset += selector.row_count;
4795                }
4796            }
4797        }
4798    }
4799
4800    #[test]
4801    fn test_read_old_nested_list() {
4802        use arrow::datatypes::DataType;
4803        use arrow::datatypes::ToByteSlice;
4804
4805        let testdata = arrow::util::test_util::parquet_test_data();
4806        // message my_record {
4807        //     REQUIRED group a (LIST) {
4808        //         REPEATED group array (LIST) {
4809        //             REPEATED INT32 array;
4810        //         }
4811        //     }
4812        // }
4813        // should be read as list<list<int32>>
4814        let path = format!("{testdata}/old_list_structure.parquet");
4815        let test_file = File::open(path).unwrap();
4816
4817        // create expected ListArray
4818        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4819
4820        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4821        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4822
4823        // Construct a list array from the above two
4824        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4825            "array",
4826            DataType::Int32,
4827            false,
4828        ))))
4829        .len(2)
4830        .add_buffer(a_value_offsets)
4831        .add_child_data(a_values.into_data())
4832        .build()
4833        .unwrap();
4834        let a = ListArray::from(a_list_data);
4835
4836        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4837        let mut reader = builder.build().unwrap();
4838        let out = reader.next().unwrap().unwrap();
4839        assert_eq!(out.num_rows(), 1);
4840        assert_eq!(out.num_columns(), 1);
4841        // grab first column
4842        let c0 = out.column(0);
4843        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4844        // get first row: [[1, 2], [3, 4]]
4845        let r0 = c0arr.value(0);
4846        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4847        assert_eq!(r0arr, &a);
4848    }
4849
4850    #[test]
4851    fn test_map_no_value() {
4852        // File schema:
4853        // message schema {
4854        //   required group my_map (MAP) {
4855        //     repeated group key_value {
4856        //       required int32 key;
4857        //       optional int32 value;
4858        //     }
4859        //   }
4860        //   required group my_map_no_v (MAP) {
4861        //     repeated group key_value {
4862        //       required int32 key;
4863        //     }
4864        //   }
4865        //   required group my_list (LIST) {
4866        //     repeated group list {
4867        //       required int32 element;
4868        //     }
4869        //   }
4870        // }
4871        let testdata = arrow::util::test_util::parquet_test_data();
4872        let path = format!("{testdata}/map_no_value.parquet");
4873        let file = File::open(path).unwrap();
4874
4875        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4876            .unwrap()
4877            .build()
4878            .unwrap();
4879        let out = reader.next().unwrap().unwrap();
4880        assert_eq!(out.num_rows(), 3);
4881        assert_eq!(out.num_columns(), 3);
4882        // my_map_no_v and my_list columns should now be equivalent
4883        let c0 = out.column(1).as_list::<i32>();
4884        let c1 = out.column(2).as_list::<i32>();
4885        assert_eq!(c0.len(), c1.len());
4886        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4887    }
4888
4889    #[test]
4890    fn test_get_row_group_column_bloom_filter_with_length() {
4891        // convert to new parquet file with bloom_filter_length
4892        let testdata = arrow::util::test_util::parquet_test_data();
4893        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4894        let file = File::open(path).unwrap();
4895        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4896        let schema = builder.schema().clone();
4897        let reader = builder.build().unwrap();
4898
4899        let mut parquet_data = Vec::new();
4900        let props = WriterProperties::builder()
4901            .set_bloom_filter_enabled(true)
4902            .build();
4903        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4904        for batch in reader {
4905            let batch = batch.unwrap();
4906            writer.write(&batch).unwrap();
4907        }
4908        writer.close().unwrap();
4909
4910        // test the new parquet file
4911        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4912    }
4913
4914    #[test]
4915    fn test_get_row_group_column_bloom_filter_without_length() {
4916        let testdata = arrow::util::test_util::parquet_test_data();
4917        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4918        let data = Bytes::from(std::fs::read(path).unwrap());
4919        test_get_row_group_column_bloom_filter(data, false);
4920    }
4921
4922    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4923        let mut builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4924
4925        let metadata = builder.metadata();
4926        assert_eq!(metadata.num_row_groups(), 1);
4927        let row_group = metadata.row_group(0);
4928        let column = row_group.column(0);
4929        assert_eq!(column.bloom_filter_length().is_some(), with_length);
4930
4931        let sbbf = builder
4932            .get_row_group_column_bloom_filter(0, 0)
4933            .unwrap()
4934            .unwrap();
4935        assert!(sbbf.check(&"Hello"));
4936        assert!(!sbbf.check(&"Hello_Not_Exists"));
4937    }
4938}