parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46/// Builder for constructing parquet readers into arrow.
47///
48/// Most users should use one of the following specializations:
49///
50/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
51/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
52///
53/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
54pub struct ArrowReaderBuilder<T> {
55    pub(crate) input: T,
56
57    pub(crate) metadata: Arc<ParquetMetaData>,
58
59    pub(crate) schema: SchemaRef,
60
61    pub(crate) fields: Option<Arc<ParquetField>>,
62
63    pub(crate) batch_size: usize,
64
65    pub(crate) row_groups: Option<Vec<usize>>,
66
67    pub(crate) projection: ProjectionMask,
68
69    pub(crate) filter: Option<RowFilter>,
70
71    pub(crate) selection: Option<RowSelection>,
72
73    pub(crate) limit: Option<usize>,
74
75    pub(crate) offset: Option<usize>,
76}
77
78impl<T> ArrowReaderBuilder<T> {
79    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
80        Self {
81            input,
82            metadata: metadata.metadata,
83            schema: metadata.schema,
84            fields: metadata.fields,
85            batch_size: 1024,
86            row_groups: None,
87            projection: ProjectionMask::all(),
88            filter: None,
89            selection: None,
90            limit: None,
91            offset: None,
92        }
93    }
94
95    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
96    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
97        &self.metadata
98    }
99
100    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
101    pub fn parquet_schema(&self) -> &SchemaDescriptor {
102        self.metadata.file_metadata().schema_descr()
103    }
104
105    /// Returns the arrow [`SchemaRef`] for this parquet file
106    pub fn schema(&self) -> &SchemaRef {
107        &self.schema
108    }
109
110    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
111    /// If the batch_size more than the file row count, use the file row count.
112    pub fn with_batch_size(self, batch_size: usize) -> Self {
113        // Try to avoid allocate large buffer
114        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
115        Self { batch_size, ..self }
116    }
117
118    /// Only read data from the provided row group indexes
119    ///
120    /// This is also called row group filtering
121    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
122        Self {
123            row_groups: Some(row_groups),
124            ..self
125        }
126    }
127
128    /// Only read data from the provided column indexes
129    pub fn with_projection(self, mask: ProjectionMask) -> Self {
130        Self {
131            projection: mask,
132            ..self
133        }
134    }
135
136    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
137    /// data into memory.
138    ///
139    /// This feature is used to restrict which rows are decoded within row
140    /// groups, skipping ranges of rows that are not needed. Such selections
141    /// could be determined by evaluating predicates against the parquet page
142    /// [`Index`] or some other external information available to a query
143    /// engine.
144    ///
145    /// # Notes
146    ///
147    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
148    /// applying the row selection, and therefore rows from skipped row groups
149    /// should not be included in the [`RowSelection`] (see example below)
150    ///
151    /// It is recommended to enable writing the page index if using this
152    /// functionality, to allow more efficient skipping over data pages. See
153    /// [`ArrowReaderOptions::with_page_index`].
154    ///
155    /// # Example
156    ///
157    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
158    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
159    /// row group 3:
160    ///
161    /// ```text
162    ///   Row Group 0, 1000 rows (selected)
163    ///   Row Group 1, 1000 rows (skipped)
164    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
165    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
166    /// ```
167    ///
168    /// You could pass the following [`RowSelection`]:
169    ///
170    /// ```text
171    ///  Select 1000    (scan all rows in row group 0)
172    ///  Skip 50        (skip the first 50 rows in row group 2)
173    ///  Select 50      (scan rows 50-100 in row group 2)
174    ///  Skip 900       (skip the remaining rows in row group 2)
175    ///  Skip 200       (skip the first 200 rows in row group 3)
176    ///  Select 100     (scan rows 200-300 in row group 3)
177    ///  Skip 700       (skip the remaining rows in row group 3)
178    /// ```
179    /// Note there is no entry for the (entirely) skipped row group 1.
180    ///
181    /// Note you can represent the same selection with fewer entries. Instead of
182    ///
183    /// ```text
184    ///  Skip 900       (skip the remaining rows in row group 2)
185    ///  Skip 200       (skip the first 200 rows in row group 3)
186    /// ```
187    ///
188    /// you could use
189    ///
190    /// ```text
191    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
192    /// ```
193    ///
194    /// [`Index`]: crate::file::page_index::index::Index
195    pub fn with_row_selection(self, selection: RowSelection) -> Self {
196        Self {
197            selection: Some(selection),
198            ..self
199        }
200    }
201
202    /// Provide a [`RowFilter`] to skip decoding rows
203    ///
204    /// Row filters are applied after row group selection and row selection
205    ///
206    /// It is recommended to enable reading the page index if using this functionality, to allow
207    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
208    pub fn with_row_filter(self, filter: RowFilter) -> Self {
209        Self {
210            filter: Some(filter),
211            ..self
212        }
213    }
214
215    /// Provide a limit to the number of rows to be read
216    ///
217    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
218    /// allowing it to limit the final set of rows decoded after any pushed down predicates
219    ///
220    /// It is recommended to enable reading the page index if using this functionality, to allow
221    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
222    pub fn with_limit(self, limit: usize) -> Self {
223        Self {
224            limit: Some(limit),
225            ..self
226        }
227    }
228
229    /// Provide an offset to skip over the given number of rows
230    ///
231    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
232    /// allowing it to skip rows after any pushed down predicates
233    ///
234    /// It is recommended to enable reading the page index if using this functionality, to allow
235    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
236    pub fn with_offset(self, offset: usize) -> Self {
237        Self {
238            offset: Some(offset),
239            ..self
240        }
241    }
242}
243
244/// Options that control how metadata is read for a parquet file
245///
246/// See [`ArrowReaderBuilder`] for how to configure how the column data
247/// is then read from the file, including projection and filter pushdown
248#[derive(Debug, Clone, Default)]
249pub struct ArrowReaderOptions {
250    /// Should the reader strip any user defined metadata from the Arrow schema
251    skip_arrow_metadata: bool,
252    /// If provided used as the schema for the file, otherwise the schema is read from the file
253    supplied_schema: Option<SchemaRef>,
254    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
255    pub(crate) page_index: bool,
256    /// If encryption is enabled, the file decryption properties can be provided
257    #[cfg(feature = "encryption")]
258    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
259}
260
261impl ArrowReaderOptions {
262    /// Create a new [`ArrowReaderOptions`] with the default settings
263    pub fn new() -> Self {
264        Self::default()
265    }
266
267    /// Skip decoding the embedded arrow metadata (defaults to `false`)
268    ///
269    /// Parquet files generated by some writers may contain embedded arrow
270    /// schema and metadata.
271    /// This may not be correct or compatible with your system,
272    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
273    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
274        Self {
275            skip_arrow_metadata,
276            ..self
277        }
278    }
279
280    /// Provide a schema to use when reading the parquet file. If provided it
281    /// takes precedence over the schema inferred from the file or the schema defined
282    /// in the file's metadata. If the schema is not compatible with the file's
283    /// schema an error will be returned when constructing the builder.
284    ///
285    /// This option is only required if you want to cast columns to a different type.
286    /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp
287    /// in the Arrow schema.
288    ///
289    /// The supplied schema must have the same number of columns as the parquet schema and
290    /// the column names need to be the same.
291    ///
292    /// # Example
293    /// ```
294    /// use std::io::Bytes;
295    /// use std::sync::Arc;
296    /// use tempfile::tempfile;
297    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
298    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
299    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
300    /// use parquet::arrow::ArrowWriter;
301    ///
302    /// // Write data - schema is inferred from the data to be Int32
303    /// let file = tempfile().unwrap();
304    /// let batch = RecordBatch::try_from_iter(vec![
305    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
306    /// ]).unwrap();
307    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
308    /// writer.write(&batch).unwrap();
309    /// writer.close().unwrap();
310    ///
311    /// // Read the file back.
312    /// // Supply a schema that interprets the Int32 column as a Timestamp.
313    /// let supplied_schema = Arc::new(Schema::new(vec![
314    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
315    /// ]));
316    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
317    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
318    ///     file.try_clone().unwrap(),
319    ///     options
320    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
321    ///
322    /// // Create the reader and read the data using the supplied schema.
323    /// let mut reader = builder.build().unwrap();
324    /// let _batch = reader.next().unwrap().unwrap();
325    /// ```
326    pub fn with_schema(self, schema: SchemaRef) -> Self {
327        Self {
328            supplied_schema: Some(schema),
329            skip_arrow_metadata: true,
330            ..self
331        }
332    }
333
334    /// Enable reading [`PageIndex`], if present (defaults to `false`)
335    ///
336    /// The `PageIndex` can be used to push down predicates to the parquet scan,
337    /// potentially eliminating unnecessary IO, by some query engines.
338    ///
339    /// If this is enabled, [`ParquetMetaData::column_index`] and
340    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
341    /// information is present in the file.
342    ///
343    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
344    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
345    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
346    pub fn with_page_index(self, page_index: bool) -> Self {
347        Self { page_index, ..self }
348    }
349
350    /// Provide the file decryption properties to use when reading encrypted parquet files.
351    ///
352    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
353    #[cfg(feature = "encryption")]
354    pub fn with_file_decryption_properties(
355        self,
356        file_decryption_properties: FileDecryptionProperties,
357    ) -> Self {
358        Self {
359            file_decryption_properties: Some(file_decryption_properties),
360            ..self
361        }
362    }
363}
364
365/// The metadata necessary to construct a [`ArrowReaderBuilder`]
366///
367/// Note this structure is cheaply clone-able as it consists of several arcs.
368///
369/// This structure allows
370///
371/// 1. Loading metadata for a file once and then using that same metadata to
372///    construct multiple separate readers, for example, to distribute readers
373///    across multiple threads
374///
375/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
376///    from the file each time a reader is constructed.
377///
378/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
379#[derive(Debug, Clone)]
380pub struct ArrowReaderMetadata {
381    /// The Parquet Metadata, if known aprior
382    pub(crate) metadata: Arc<ParquetMetaData>,
383    /// The Arrow Schema
384    pub(crate) schema: SchemaRef,
385
386    pub(crate) fields: Option<Arc<ParquetField>>,
387}
388
389impl ArrowReaderMetadata {
390    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
391    ///
392    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
393    /// example of how this can be used
394    ///
395    /// # Notes
396    ///
397    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
398    /// `Self::metadata` is missing the page index, this function will attempt
399    /// to load the page index by making an object store request.
400    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
401        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
402        #[cfg(feature = "encryption")]
403        let metadata =
404            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
405        let metadata = metadata.parse_and_finish(reader)?;
406        Self::try_new(Arc::new(metadata), options)
407    }
408
409    /// Create a new [`ArrowReaderMetadata`]
410    ///
411    /// # Notes
412    ///
413    /// This function does not attempt to load the PageIndex if not present in the metadata.
414    /// See [`Self::load`] for more details.
415    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
416        match options.supplied_schema {
417            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
418            None => {
419                let kv_metadata = match options.skip_arrow_metadata {
420                    true => None,
421                    false => metadata.file_metadata().key_value_metadata(),
422                };
423
424                let (schema, fields) = parquet_to_arrow_schema_and_fields(
425                    metadata.file_metadata().schema_descr(),
426                    ProjectionMask::all(),
427                    kv_metadata,
428                )?;
429
430                Ok(Self {
431                    metadata,
432                    schema: Arc::new(schema),
433                    fields: fields.map(Arc::new),
434                })
435            }
436        }
437    }
438
439    fn with_supplied_schema(
440        metadata: Arc<ParquetMetaData>,
441        supplied_schema: SchemaRef,
442    ) -> Result<Self> {
443        let parquet_schema = metadata.file_metadata().schema_descr();
444        let field_levels = parquet_to_arrow_field_levels(
445            parquet_schema,
446            ProjectionMask::all(),
447            Some(supplied_schema.fields()),
448        )?;
449        let fields = field_levels.fields;
450        let inferred_len = fields.len();
451        let supplied_len = supplied_schema.fields().len();
452        // Ensure the supplied schema has the same number of columns as the parquet schema.
453        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
454        // different lengths, but we check here to be safe.
455        if inferred_len != supplied_len {
456            Err(arrow_err!(format!(
457                "incompatible arrow schema, expected {} columns received {}",
458                inferred_len, supplied_len
459            )))
460        } else {
461            let diff_fields: Vec<_> = supplied_schema
462                .fields()
463                .iter()
464                .zip(fields.iter())
465                .filter_map(|(field1, field2)| {
466                    if field1 != field2 {
467                        Some(field1.name().clone())
468                    } else {
469                        None
470                    }
471                })
472                .collect();
473
474            if !diff_fields.is_empty() {
475                Err(ParquetError::ArrowError(format!(
476                    "incompatible arrow schema, the following fields could not be cast: [{}]",
477                    diff_fields.join(", ")
478                )))
479            } else {
480                Ok(Self {
481                    metadata,
482                    schema: supplied_schema,
483                    fields: field_levels.levels.map(Arc::new),
484                })
485            }
486        }
487    }
488
489    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
490    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
491        &self.metadata
492    }
493
494    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
495    pub fn parquet_schema(&self) -> &SchemaDescriptor {
496        self.metadata.file_metadata().schema_descr()
497    }
498
499    /// Returns the arrow [`SchemaRef`] for this parquet file
500    pub fn schema(&self) -> &SchemaRef {
501        &self.schema
502    }
503}
504
505#[doc(hidden)]
506/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
507pub struct SyncReader<T: ChunkReader>(T);
508
509/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
510///
511/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
512///
513/// See [`ArrowReaderBuilder`] for additional member functions
514pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
515
516impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
517    /// Create a new [`ParquetRecordBatchReaderBuilder`]
518    ///
519    /// ```
520    /// # use std::sync::Arc;
521    /// # use bytes::Bytes;
522    /// # use arrow_array::{Int32Array, RecordBatch};
523    /// # use arrow_schema::{DataType, Field, Schema};
524    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
525    /// # use parquet::arrow::ArrowWriter;
526    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
527    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
528    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
529    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
530    /// # writer.write(&batch).unwrap();
531    /// # writer.close().unwrap();
532    /// # let file = Bytes::from(file);
533    /// #
534    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
535    ///
536    /// // Inspect metadata
537    /// assert_eq!(builder.metadata().num_row_groups(), 1);
538    ///
539    /// // Construct reader
540    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
541    ///
542    /// // Read data
543    /// let _batch = reader.next().unwrap().unwrap();
544    /// ```
545    pub fn try_new(reader: T) -> Result<Self> {
546        Self::try_new_with_options(reader, Default::default())
547    }
548
549    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
550    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
551        let metadata = ArrowReaderMetadata::load(&reader, options)?;
552        Ok(Self::new_with_metadata(reader, metadata))
553    }
554
555    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
556    ///
557    /// This interface allows:
558    ///
559    /// 1. Loading metadata once and using it to create multiple builders with
560    ///    potentially different settings or run on different threads
561    ///
562    /// 2. Using a cached copy of the metadata rather than re-reading it from the
563    ///    file each time a reader is constructed.
564    ///
565    /// See the docs on [`ArrowReaderMetadata`] for more details
566    ///
567    /// # Example
568    /// ```
569    /// # use std::fs::metadata;
570    /// # use std::sync::Arc;
571    /// # use bytes::Bytes;
572    /// # use arrow_array::{Int32Array, RecordBatch};
573    /// # use arrow_schema::{DataType, Field, Schema};
574    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
575    /// # use parquet::arrow::ArrowWriter;
576    /// #
577    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
578    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
579    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
580    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
581    /// # writer.write(&batch).unwrap();
582    /// # writer.close().unwrap();
583    /// # let file = Bytes::from(file);
584    /// #
585    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
586    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
587    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
588    ///
589    /// // Should be able to read from both in parallel
590    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
591    /// ```
592    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
593        Self::new_builder(SyncReader(input), metadata)
594    }
595
596    /// Build a [`ParquetRecordBatchReader`]
597    ///
598    /// Note: this will eagerly evaluate any `RowFilter` before returning
599    pub fn build(self) -> Result<ParquetRecordBatchReader> {
600        // Try to avoid allocate large buffer
601        let batch_size = self
602            .batch_size
603            .min(self.metadata.file_metadata().num_rows() as usize);
604
605        let row_groups = self
606            .row_groups
607            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
608
609        let reader = ReaderRowGroups {
610            reader: Arc::new(self.input.0),
611            metadata: self.metadata,
612            row_groups,
613        };
614
615        let mut filter = self.filter;
616        let mut selection = self.selection;
617
618        if let Some(filter) = filter.as_mut() {
619            for predicate in filter.predicates.iter_mut() {
620                if !selects_any(selection.as_ref()) {
621                    break;
622                }
623
624                let array_reader =
625                    build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
626
627                selection = Some(evaluate_predicate(
628                    batch_size,
629                    array_reader,
630                    selection,
631                    predicate.as_mut(),
632                )?);
633            }
634        }
635
636        let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
637
638        // If selection is empty, truncate
639        if !selects_any(selection.as_ref()) {
640            selection = Some(RowSelection::from(vec![]));
641        }
642
643        Ok(ParquetRecordBatchReader::new(
644            batch_size,
645            array_reader,
646            apply_range(selection, reader.num_rows(), self.offset, self.limit),
647        ))
648    }
649}
650
651struct ReaderRowGroups<T: ChunkReader> {
652    reader: Arc<T>,
653
654    metadata: Arc<ParquetMetaData>,
655    /// Optional list of row group indices to scan
656    row_groups: Vec<usize>,
657}
658
659impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
660    fn num_rows(&self) -> usize {
661        let meta = self.metadata.row_groups();
662        self.row_groups
663            .iter()
664            .map(|x| meta[*x].num_rows() as usize)
665            .sum()
666    }
667
668    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
669        Ok(Box::new(ReaderPageIterator {
670            column_idx: i,
671            reader: self.reader.clone(),
672            metadata: self.metadata.clone(),
673            row_groups: self.row_groups.clone().into_iter(),
674        }))
675    }
676}
677
678struct ReaderPageIterator<T: ChunkReader> {
679    reader: Arc<T>,
680    column_idx: usize,
681    row_groups: std::vec::IntoIter<usize>,
682    metadata: Arc<ParquetMetaData>,
683}
684
685impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
686    /// Return the next SerializedPageReader
687    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
688        let rg = self.metadata.row_group(rg_idx);
689        let column_chunk_metadata = rg.column(self.column_idx);
690        let offset_index = self.metadata.offset_index();
691        // `offset_index` may not exist and `i[rg_idx]` will be empty.
692        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
693        let page_locations = offset_index
694            .filter(|i| !i[rg_idx].is_empty())
695            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
696        let total_rows = rg.num_rows() as usize;
697        let reader = self.reader.clone();
698
699        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
700            .add_crypto_context(
701                rg_idx,
702                self.column_idx,
703                self.metadata.as_ref(),
704                column_chunk_metadata,
705            )
706    }
707}
708
709impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
710    type Item = Result<Box<dyn PageReader>>;
711
712    fn next(&mut self) -> Option<Self::Item> {
713        let rg_idx = self.row_groups.next()?;
714        let page_reader = self
715            .next_page_reader(rg_idx)
716            .map(|page_reader| Box::new(page_reader) as _);
717        Some(page_reader)
718    }
719}
720
721impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
722
723/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
724/// read from a parquet data source
725pub struct ParquetRecordBatchReader {
726    batch_size: usize,
727    array_reader: Box<dyn ArrayReader>,
728    schema: SchemaRef,
729    selection: Option<VecDeque<RowSelector>>,
730}
731
732impl Iterator for ParquetRecordBatchReader {
733    type Item = Result<RecordBatch, ArrowError>;
734
735    fn next(&mut self) -> Option<Self::Item> {
736        let mut read_records = 0;
737        match self.selection.as_mut() {
738            Some(selection) => {
739                while read_records < self.batch_size && !selection.is_empty() {
740                    let front = selection.pop_front().unwrap();
741                    if front.skip {
742                        let skipped = match self.array_reader.skip_records(front.row_count) {
743                            Ok(skipped) => skipped,
744                            Err(e) => return Some(Err(e.into())),
745                        };
746
747                        if skipped != front.row_count {
748                            return Some(Err(general_err!(
749                                "failed to skip rows, expected {}, got {}",
750                                front.row_count,
751                                skipped
752                            )
753                            .into()));
754                        }
755                        continue;
756                    }
757
758                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
759                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
760                    if front.row_count == 0 {
761                        continue;
762                    }
763
764                    // try to read record
765                    let need_read = self.batch_size - read_records;
766                    let to_read = match front.row_count.checked_sub(need_read) {
767                        Some(remaining) if remaining != 0 => {
768                            // if page row count less than batch_size we must set batch size to page row count.
769                            // add check avoid dead loop
770                            selection.push_front(RowSelector::select(remaining));
771                            need_read
772                        }
773                        _ => front.row_count,
774                    };
775                    match self.array_reader.read_records(to_read) {
776                        Ok(0) => break,
777                        Ok(rec) => read_records += rec,
778                        Err(error) => return Some(Err(error.into())),
779                    }
780                }
781            }
782            None => {
783                if let Err(error) = self.array_reader.read_records(self.batch_size) {
784                    return Some(Err(error.into()));
785                }
786            }
787        };
788
789        match self.array_reader.consume_batch() {
790            Err(error) => Some(Err(error.into())),
791            Ok(array) => {
792                let struct_array = array.as_struct_opt().ok_or_else(|| {
793                    ArrowError::ParquetError(
794                        "Struct array reader should return struct array".to_string(),
795                    )
796                });
797
798                match struct_array {
799                    Err(err) => Some(Err(err)),
800                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
801                }
802            }
803        }
804    }
805}
806
807impl RecordBatchReader for ParquetRecordBatchReader {
808    /// Returns the projected [`SchemaRef`] for reading the parquet file.
809    ///
810    /// Note that the schema metadata will be stripped here. See
811    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
812    fn schema(&self) -> SchemaRef {
813        self.schema.clone()
814    }
815}
816
817impl ParquetRecordBatchReader {
818    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
819    ///
820    /// See [`ParquetRecordBatchReaderBuilder`] for more options
821    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
822        ParquetRecordBatchReaderBuilder::try_new(reader)?
823            .with_batch_size(batch_size)
824            .build()
825    }
826
827    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
828    ///
829    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
830    /// higher-level interface for reading parquet data from a file
831    pub fn try_new_with_row_groups(
832        levels: &FieldLevels,
833        row_groups: &dyn RowGroups,
834        batch_size: usize,
835        selection: Option<RowSelection>,
836    ) -> Result<Self> {
837        let array_reader =
838            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
839
840        Ok(Self {
841            batch_size,
842            array_reader,
843            schema: Arc::new(Schema::new(levels.fields.clone())),
844            selection: selection.map(|s| s.trim().into()),
845        })
846    }
847
848    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
849    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
850    /// all rows will be returned
851    pub(crate) fn new(
852        batch_size: usize,
853        array_reader: Box<dyn ArrayReader>,
854        selection: Option<RowSelection>,
855    ) -> Self {
856        let schema = match array_reader.get_data_type() {
857            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
858            _ => unreachable!("Struct array reader's data type is not struct!"),
859        };
860
861        Self {
862            batch_size,
863            array_reader,
864            schema: Arc::new(schema),
865            selection: selection.map(|s| s.trim().into()),
866        }
867    }
868}
869
870/// Returns `true` if `selection` is `None` or selects some rows
871pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
872    selection.map(|x| x.selects_any()).unwrap_or(true)
873}
874
875/// Applies an optional offset and limit to an optional [`RowSelection`]
876pub(crate) fn apply_range(
877    mut selection: Option<RowSelection>,
878    row_count: usize,
879    offset: Option<usize>,
880    limit: Option<usize>,
881) -> Option<RowSelection> {
882    // If an offset is defined, apply it to the `selection`
883    if let Some(offset) = offset {
884        selection = Some(match row_count.checked_sub(offset) {
885            None => RowSelection::from(vec![]),
886            Some(remaining) => selection
887                .map(|selection| selection.offset(offset))
888                .unwrap_or_else(|| {
889                    RowSelection::from(vec![
890                        RowSelector::skip(offset),
891                        RowSelector::select(remaining),
892                    ])
893                }),
894        });
895    }
896
897    // If a limit is defined, apply it to the final `selection`
898    if let Some(limit) = limit {
899        selection = Some(
900            selection
901                .map(|selection| selection.limit(limit))
902                .unwrap_or_else(|| {
903                    RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
904                }),
905        );
906    }
907    selection
908}
909
910/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
911/// which rows to return.
912///
913/// `input_selection`: Optional pre-existing selection. If `Some`, then the
914/// final [`RowSelection`] will be the conjunction of it and the rows selected
915/// by `predicate`.
916///
917/// Note: A pre-existing selection may come from evaluating a previous predicate
918/// or if the [`ParquetRecordBatchReader`] specified an explicit
919/// [`RowSelection`] in addition to one or more predicates.
920pub(crate) fn evaluate_predicate(
921    batch_size: usize,
922    array_reader: Box<dyn ArrayReader>,
923    input_selection: Option<RowSelection>,
924    predicate: &mut dyn ArrowPredicate,
925) -> Result<RowSelection> {
926    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
927    let mut filters = vec![];
928    for maybe_batch in reader {
929        let maybe_batch = maybe_batch?;
930        let input_rows = maybe_batch.num_rows();
931        let filter = predicate.evaluate(maybe_batch)?;
932        // Since user supplied predicate, check error here to catch bugs quickly
933        if filter.len() != input_rows {
934            return Err(arrow_err!(
935                "ArrowPredicate predicate returned {} rows, expected {input_rows}",
936                filter.len()
937            ));
938        }
939        match filter.null_count() {
940            0 => filters.push(filter),
941            _ => filters.push(prep_null_mask_filter(&filter)),
942        };
943    }
944
945    let raw = RowSelection::from_filters(&filters);
946    Ok(match input_selection {
947        Some(selection) => selection.and_then(&raw),
948        None => raw,
949    })
950}
951
952#[cfg(test)]
953mod tests {
954    use std::cmp::min;
955    use std::collections::{HashMap, VecDeque};
956    use std::fmt::Formatter;
957    use std::fs::File;
958    use std::io::Seek;
959    use std::path::PathBuf;
960    use std::sync::Arc;
961
962    use bytes::Bytes;
963    use half::f16;
964    use num::PrimInt;
965    use rand::{rng, Rng, RngCore};
966    use tempfile::tempfile;
967
968    use arrow_array::builder::*;
969    use arrow_array::cast::AsArray;
970    use arrow_array::types::{
971        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
972        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
973    };
974    use arrow_array::*;
975    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
976    use arrow_data::{ArrayData, ArrayDataBuilder};
977    use arrow_schema::{
978        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
979    };
980    use arrow_select::concat::concat_batches;
981
982    use crate::arrow::arrow_reader::{
983        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
984        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
985    };
986    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
987    use crate::arrow::{ArrowWriter, ProjectionMask};
988    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
989    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
990    use crate::data_type::{
991        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
992        FloatType, Int32Type, Int64Type, Int96, Int96Type,
993    };
994    use crate::errors::Result;
995    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
996    use crate::file::writer::SerializedFileWriter;
997    use crate::schema::parser::parse_message_type;
998    use crate::schema::types::{Type, TypePtr};
999    use crate::util::test_common::rand_gen::RandGen;
1000
1001    #[test]
1002    fn test_arrow_reader_all_columns() {
1003        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1004
1005        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1006        let original_schema = Arc::clone(builder.schema());
1007        let reader = builder.build().unwrap();
1008
1009        // Verify that the schema was correctly parsed
1010        assert_eq!(original_schema.fields(), reader.schema().fields());
1011    }
1012
1013    #[test]
1014    fn test_arrow_reader_single_column() {
1015        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1016
1017        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1018        let original_schema = Arc::clone(builder.schema());
1019
1020        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1021        let reader = builder.with_projection(mask).build().unwrap();
1022
1023        // Verify that the schema was correctly parsed
1024        assert_eq!(1, reader.schema().fields().len());
1025        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1026    }
1027
1028    #[test]
1029    fn test_arrow_reader_single_column_by_name() {
1030        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1031
1032        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1033        let original_schema = Arc::clone(builder.schema());
1034
1035        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1036        let reader = builder.with_projection(mask).build().unwrap();
1037
1038        // Verify that the schema was correctly parsed
1039        assert_eq!(1, reader.schema().fields().len());
1040        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1041    }
1042
1043    #[test]
1044    fn test_null_column_reader_test() {
1045        let mut file = tempfile::tempfile().unwrap();
1046
1047        let schema = "
1048            message message {
1049                OPTIONAL INT32 int32;
1050            }
1051        ";
1052        let schema = Arc::new(parse_message_type(schema).unwrap());
1053
1054        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1055        generate_single_column_file_with_data::<Int32Type>(
1056            &[vec![], vec![]],
1057            Some(&def_levels),
1058            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1059            schema,
1060            Some(Field::new("int32", ArrowDataType::Null, true)),
1061            &Default::default(),
1062        )
1063        .unwrap();
1064
1065        file.rewind().unwrap();
1066
1067        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1068        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1069
1070        assert_eq!(batches.len(), 4);
1071        for batch in &batches[0..3] {
1072            assert_eq!(batch.num_rows(), 2);
1073            assert_eq!(batch.num_columns(), 1);
1074            assert_eq!(batch.column(0).null_count(), 2);
1075        }
1076
1077        assert_eq!(batches[3].num_rows(), 1);
1078        assert_eq!(batches[3].num_columns(), 1);
1079        assert_eq!(batches[3].column(0).null_count(), 1);
1080    }
1081
1082    #[test]
1083    fn test_primitive_single_column_reader_test() {
1084        run_single_column_reader_tests::<BoolType, _, BoolType>(
1085            2,
1086            ConvertedType::NONE,
1087            None,
1088            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1089            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1090        );
1091        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1092            2,
1093            ConvertedType::NONE,
1094            None,
1095            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1096            &[
1097                Encoding::PLAIN,
1098                Encoding::RLE_DICTIONARY,
1099                Encoding::DELTA_BINARY_PACKED,
1100                Encoding::BYTE_STREAM_SPLIT,
1101            ],
1102        );
1103        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1104            2,
1105            ConvertedType::NONE,
1106            None,
1107            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1108            &[
1109                Encoding::PLAIN,
1110                Encoding::RLE_DICTIONARY,
1111                Encoding::DELTA_BINARY_PACKED,
1112                Encoding::BYTE_STREAM_SPLIT,
1113            ],
1114        );
1115        run_single_column_reader_tests::<FloatType, _, FloatType>(
1116            2,
1117            ConvertedType::NONE,
1118            None,
1119            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1120            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1121        );
1122    }
1123
1124    #[test]
1125    fn test_unsigned_primitive_single_column_reader_test() {
1126        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1127            2,
1128            ConvertedType::UINT_32,
1129            Some(ArrowDataType::UInt32),
1130            |vals| {
1131                Arc::new(UInt32Array::from_iter(
1132                    vals.iter().map(|x| x.map(|x| x as u32)),
1133                ))
1134            },
1135            &[
1136                Encoding::PLAIN,
1137                Encoding::RLE_DICTIONARY,
1138                Encoding::DELTA_BINARY_PACKED,
1139            ],
1140        );
1141        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1142            2,
1143            ConvertedType::UINT_64,
1144            Some(ArrowDataType::UInt64),
1145            |vals| {
1146                Arc::new(UInt64Array::from_iter(
1147                    vals.iter().map(|x| x.map(|x| x as u64)),
1148                ))
1149            },
1150            &[
1151                Encoding::PLAIN,
1152                Encoding::RLE_DICTIONARY,
1153                Encoding::DELTA_BINARY_PACKED,
1154            ],
1155        );
1156    }
1157
1158    #[test]
1159    fn test_unsigned_roundtrip() {
1160        let schema = Arc::new(Schema::new(vec![
1161            Field::new("uint32", ArrowDataType::UInt32, true),
1162            Field::new("uint64", ArrowDataType::UInt64, true),
1163        ]));
1164
1165        let mut buf = Vec::with_capacity(1024);
1166        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1167
1168        let original = RecordBatch::try_new(
1169            schema,
1170            vec![
1171                Arc::new(UInt32Array::from_iter_values([
1172                    0,
1173                    i32::MAX as u32,
1174                    u32::MAX,
1175                ])),
1176                Arc::new(UInt64Array::from_iter_values([
1177                    0,
1178                    i64::MAX as u64,
1179                    u64::MAX,
1180                ])),
1181            ],
1182        )
1183        .unwrap();
1184
1185        writer.write(&original).unwrap();
1186        writer.close().unwrap();
1187
1188        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1189        let ret = reader.next().unwrap().unwrap();
1190        assert_eq!(ret, original);
1191
1192        // Check they can be downcast to the correct type
1193        ret.column(0)
1194            .as_any()
1195            .downcast_ref::<UInt32Array>()
1196            .unwrap();
1197
1198        ret.column(1)
1199            .as_any()
1200            .downcast_ref::<UInt64Array>()
1201            .unwrap();
1202    }
1203
1204    #[test]
1205    fn test_float16_roundtrip() -> Result<()> {
1206        let schema = Arc::new(Schema::new(vec![
1207            Field::new("float16", ArrowDataType::Float16, false),
1208            Field::new("float16-nullable", ArrowDataType::Float16, true),
1209        ]));
1210
1211        let mut buf = Vec::with_capacity(1024);
1212        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1213
1214        let original = RecordBatch::try_new(
1215            schema,
1216            vec![
1217                Arc::new(Float16Array::from_iter_values([
1218                    f16::EPSILON,
1219                    f16::MIN,
1220                    f16::MAX,
1221                    f16::NAN,
1222                    f16::INFINITY,
1223                    f16::NEG_INFINITY,
1224                    f16::ONE,
1225                    f16::NEG_ONE,
1226                    f16::ZERO,
1227                    f16::NEG_ZERO,
1228                    f16::E,
1229                    f16::PI,
1230                    f16::FRAC_1_PI,
1231                ])),
1232                Arc::new(Float16Array::from(vec![
1233                    None,
1234                    None,
1235                    None,
1236                    Some(f16::NAN),
1237                    Some(f16::INFINITY),
1238                    Some(f16::NEG_INFINITY),
1239                    None,
1240                    None,
1241                    None,
1242                    None,
1243                    None,
1244                    None,
1245                    Some(f16::FRAC_1_PI),
1246                ])),
1247            ],
1248        )?;
1249
1250        writer.write(&original)?;
1251        writer.close()?;
1252
1253        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1254        let ret = reader.next().unwrap()?;
1255        assert_eq!(ret, original);
1256
1257        // Ensure can be downcast to the correct type
1258        ret.column(0).as_primitive::<Float16Type>();
1259        ret.column(1).as_primitive::<Float16Type>();
1260
1261        Ok(())
1262    }
1263
1264    #[test]
1265    fn test_time_utc_roundtrip() -> Result<()> {
1266        let schema = Arc::new(Schema::new(vec![
1267            Field::new(
1268                "time_millis",
1269                ArrowDataType::Time32(TimeUnit::Millisecond),
1270                true,
1271            )
1272            .with_metadata(HashMap::from_iter(vec![(
1273                "adjusted_to_utc".to_string(),
1274                "".to_string(),
1275            )])),
1276            Field::new(
1277                "time_micros",
1278                ArrowDataType::Time64(TimeUnit::Microsecond),
1279                true,
1280            )
1281            .with_metadata(HashMap::from_iter(vec![(
1282                "adjusted_to_utc".to_string(),
1283                "".to_string(),
1284            )])),
1285        ]));
1286
1287        let mut buf = Vec::with_capacity(1024);
1288        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1289
1290        let original = RecordBatch::try_new(
1291            schema,
1292            vec![
1293                Arc::new(Time32MillisecondArray::from(vec![
1294                    Some(-1),
1295                    Some(0),
1296                    Some(86_399_000),
1297                    Some(86_400_000),
1298                    Some(86_401_000),
1299                    None,
1300                ])),
1301                Arc::new(Time64MicrosecondArray::from(vec![
1302                    Some(-1),
1303                    Some(0),
1304                    Some(86_399 * 1_000_000),
1305                    Some(86_400 * 1_000_000),
1306                    Some(86_401 * 1_000_000),
1307                    None,
1308                ])),
1309            ],
1310        )?;
1311
1312        writer.write(&original)?;
1313        writer.close()?;
1314
1315        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1316        let ret = reader.next().unwrap()?;
1317        assert_eq!(ret, original);
1318
1319        // Ensure can be downcast to the correct type
1320        ret.column(0).as_primitive::<Time32MillisecondType>();
1321        ret.column(1).as_primitive::<Time64MicrosecondType>();
1322
1323        Ok(())
1324    }
1325
1326    #[test]
1327    fn test_date32_roundtrip() -> Result<()> {
1328        use arrow_array::Date32Array;
1329
1330        let schema = Arc::new(Schema::new(vec![Field::new(
1331            "date32",
1332            ArrowDataType::Date32,
1333            false,
1334        )]));
1335
1336        let mut buf = Vec::with_capacity(1024);
1337
1338        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1339
1340        let original = RecordBatch::try_new(
1341            schema,
1342            vec![Arc::new(Date32Array::from(vec![
1343                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1344            ]))],
1345        )?;
1346
1347        writer.write(&original)?;
1348        writer.close()?;
1349
1350        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1351        let ret = reader.next().unwrap()?;
1352        assert_eq!(ret, original);
1353
1354        // Ensure can be downcast to the correct type
1355        ret.column(0).as_primitive::<Date32Type>();
1356
1357        Ok(())
1358    }
1359
1360    #[test]
1361    fn test_date64_roundtrip() -> Result<()> {
1362        use arrow_array::Date64Array;
1363
1364        let schema = Arc::new(Schema::new(vec![
1365            Field::new("small-date64", ArrowDataType::Date64, false),
1366            Field::new("big-date64", ArrowDataType::Date64, false),
1367            Field::new("invalid-date64", ArrowDataType::Date64, false),
1368        ]));
1369
1370        let mut default_buf = Vec::with_capacity(1024);
1371        let mut coerce_buf = Vec::with_capacity(1024);
1372
1373        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1374
1375        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1376        let mut coerce_writer =
1377            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1378
1379        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1380
1381        let original = RecordBatch::try_new(
1382            schema,
1383            vec![
1384                // small-date64
1385                Arc::new(Date64Array::from(vec![
1386                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1387                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1388                    0,
1389                    1_000 * NUM_MILLISECONDS_IN_DAY,
1390                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1391                ])),
1392                // big-date64
1393                Arc::new(Date64Array::from(vec![
1394                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1395                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1396                    0,
1397                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1398                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1399                ])),
1400                // invalid-date64
1401                Arc::new(Date64Array::from(vec![
1402                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1403                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1404                    1,
1405                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1406                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1407                ])),
1408            ],
1409        )?;
1410
1411        default_writer.write(&original)?;
1412        coerce_writer.write(&original)?;
1413
1414        default_writer.close()?;
1415        coerce_writer.close()?;
1416
1417        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1418        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1419
1420        let default_ret = default_reader.next().unwrap()?;
1421        let coerce_ret = coerce_reader.next().unwrap()?;
1422
1423        // Roundtrip should be successful when default writer used
1424        assert_eq!(default_ret, original);
1425
1426        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1427        assert_eq!(coerce_ret.column(0), original.column(0));
1428        assert_ne!(coerce_ret.column(1), original.column(1));
1429        assert_ne!(coerce_ret.column(2), original.column(2));
1430
1431        // Ensure both can be downcast to the correct type
1432        default_ret.column(0).as_primitive::<Date64Type>();
1433        coerce_ret.column(0).as_primitive::<Date64Type>();
1434
1435        Ok(())
1436    }
1437    struct RandFixedLenGen {}
1438
1439    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1440        fn gen(len: i32) -> FixedLenByteArray {
1441            let mut v = vec![0u8; len as usize];
1442            rng().fill_bytes(&mut v);
1443            ByteArray::from(v).into()
1444        }
1445    }
1446
1447    #[test]
1448    fn test_fixed_length_binary_column_reader() {
1449        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1450            20,
1451            ConvertedType::NONE,
1452            None,
1453            |vals| {
1454                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1455                for val in vals {
1456                    match val {
1457                        Some(b) => builder.append_value(b).unwrap(),
1458                        None => builder.append_null(),
1459                    }
1460                }
1461                Arc::new(builder.finish())
1462            },
1463            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1464        );
1465    }
1466
1467    #[test]
1468    fn test_interval_day_time_column_reader() {
1469        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1470            12,
1471            ConvertedType::INTERVAL,
1472            None,
1473            |vals| {
1474                Arc::new(
1475                    vals.iter()
1476                        .map(|x| {
1477                            x.as_ref().map(|b| IntervalDayTime {
1478                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1479                                milliseconds: i32::from_le_bytes(
1480                                    b.as_ref()[8..12].try_into().unwrap(),
1481                                ),
1482                            })
1483                        })
1484                        .collect::<IntervalDayTimeArray>(),
1485                )
1486            },
1487            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1488        );
1489    }
1490
1491    #[test]
1492    fn test_int96_single_column_reader_test() {
1493        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1494
1495        type TypeHintAndConversionFunction =
1496            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1497
1498        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1499            // Test without a specified ArrowType hint.
1500            (None, |vals: &[Option<Int96>]| {
1501                Arc::new(TimestampNanosecondArray::from_iter(
1502                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1503                )) as ArrayRef
1504            }),
1505            // Test other TimeUnits as ArrowType hints.
1506            (
1507                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1508                |vals: &[Option<Int96>]| {
1509                    Arc::new(TimestampSecondArray::from_iter(
1510                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1511                    )) as ArrayRef
1512                },
1513            ),
1514            (
1515                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1516                |vals: &[Option<Int96>]| {
1517                    Arc::new(TimestampMillisecondArray::from_iter(
1518                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1519                    )) as ArrayRef
1520                },
1521            ),
1522            (
1523                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1524                |vals: &[Option<Int96>]| {
1525                    Arc::new(TimestampMicrosecondArray::from_iter(
1526                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1527                    )) as ArrayRef
1528                },
1529            ),
1530            (
1531                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1532                |vals: &[Option<Int96>]| {
1533                    Arc::new(TimestampNanosecondArray::from_iter(
1534                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1535                    )) as ArrayRef
1536                },
1537            ),
1538            // Test another timezone with TimeUnit as ArrowType hints.
1539            (
1540                Some(ArrowDataType::Timestamp(
1541                    TimeUnit::Second,
1542                    Some(Arc::from("-05:00")),
1543                )),
1544                |vals: &[Option<Int96>]| {
1545                    Arc::new(
1546                        TimestampSecondArray::from_iter(
1547                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1548                        )
1549                        .with_timezone("-05:00"),
1550                    ) as ArrayRef
1551                },
1552            ),
1553        ];
1554
1555        resolutions.iter().for_each(|(arrow_type, converter)| {
1556            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1557                2,
1558                ConvertedType::NONE,
1559                arrow_type.clone(),
1560                converter,
1561                encodings,
1562            );
1563        })
1564    }
1565
1566    struct RandUtf8Gen {}
1567
1568    impl RandGen<ByteArrayType> for RandUtf8Gen {
1569        fn gen(len: i32) -> ByteArray {
1570            Int32Type::gen(len).to_string().as_str().into()
1571        }
1572    }
1573
1574    #[test]
1575    fn test_utf8_single_column_reader_test() {
1576        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1577            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1578                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1579            })))
1580        }
1581
1582        let encodings = &[
1583            Encoding::PLAIN,
1584            Encoding::RLE_DICTIONARY,
1585            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1586            Encoding::DELTA_BYTE_ARRAY,
1587        ];
1588
1589        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1590            2,
1591            ConvertedType::NONE,
1592            None,
1593            |vals| {
1594                Arc::new(BinaryArray::from_iter(
1595                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1596                ))
1597            },
1598            encodings,
1599        );
1600
1601        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1602            2,
1603            ConvertedType::UTF8,
1604            None,
1605            string_converter::<i32>,
1606            encodings,
1607        );
1608
1609        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1610            2,
1611            ConvertedType::UTF8,
1612            Some(ArrowDataType::Utf8),
1613            string_converter::<i32>,
1614            encodings,
1615        );
1616
1617        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1618            2,
1619            ConvertedType::UTF8,
1620            Some(ArrowDataType::LargeUtf8),
1621            string_converter::<i64>,
1622            encodings,
1623        );
1624
1625        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1626        for key in &small_key_types {
1627            for encoding in encodings {
1628                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1629                opts.encoding = *encoding;
1630
1631                let data_type =
1632                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1633
1634                // Cannot run full test suite as keys overflow, run small test instead
1635                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1636                    opts,
1637                    2,
1638                    ConvertedType::UTF8,
1639                    Some(data_type.clone()),
1640                    move |vals| {
1641                        let vals = string_converter::<i32>(vals);
1642                        arrow::compute::cast(&vals, &data_type).unwrap()
1643                    },
1644                );
1645            }
1646        }
1647
1648        let key_types = [
1649            ArrowDataType::Int16,
1650            ArrowDataType::UInt16,
1651            ArrowDataType::Int32,
1652            ArrowDataType::UInt32,
1653            ArrowDataType::Int64,
1654            ArrowDataType::UInt64,
1655        ];
1656
1657        for key in &key_types {
1658            let data_type =
1659                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1660
1661            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1662                2,
1663                ConvertedType::UTF8,
1664                Some(data_type.clone()),
1665                move |vals| {
1666                    let vals = string_converter::<i32>(vals);
1667                    arrow::compute::cast(&vals, &data_type).unwrap()
1668                },
1669                encodings,
1670            );
1671
1672            // https://github.com/apache/arrow-rs/issues/1179
1673            // let data_type = ArrowDataType::Dictionary(
1674            //     Box::new(key.clone()),
1675            //     Box::new(ArrowDataType::LargeUtf8),
1676            // );
1677            //
1678            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1679            //     2,
1680            //     ConvertedType::UTF8,
1681            //     Some(data_type.clone()),
1682            //     move |vals| {
1683            //         let vals = string_converter::<i64>(vals);
1684            //         arrow::compute::cast(&vals, &data_type).unwrap()
1685            //     },
1686            //     encodings,
1687            // );
1688        }
1689    }
1690
1691    #[test]
1692    fn test_decimal_nullable_struct() {
1693        let decimals = Decimal256Array::from_iter_values(
1694            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1695        );
1696
1697        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1698            "decimals",
1699            decimals.data_type().clone(),
1700            false,
1701        )])))
1702        .len(8)
1703        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1704        .child_data(vec![decimals.into_data()])
1705        .build()
1706        .unwrap();
1707
1708        let written =
1709            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1710                .unwrap();
1711
1712        let mut buffer = Vec::with_capacity(1024);
1713        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1714        writer.write(&written).unwrap();
1715        writer.close().unwrap();
1716
1717        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1718            .unwrap()
1719            .collect::<Result<Vec<_>, _>>()
1720            .unwrap();
1721
1722        assert_eq!(&written.slice(0, 3), &read[0]);
1723        assert_eq!(&written.slice(3, 3), &read[1]);
1724        assert_eq!(&written.slice(6, 2), &read[2]);
1725    }
1726
1727    #[test]
1728    fn test_int32_nullable_struct() {
1729        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1730        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1731            "int32",
1732            int32.data_type().clone(),
1733            false,
1734        )])))
1735        .len(8)
1736        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1737        .child_data(vec![int32.into_data()])
1738        .build()
1739        .unwrap();
1740
1741        let written =
1742            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1743                .unwrap();
1744
1745        let mut buffer = Vec::with_capacity(1024);
1746        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1747        writer.write(&written).unwrap();
1748        writer.close().unwrap();
1749
1750        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1751            .unwrap()
1752            .collect::<Result<Vec<_>, _>>()
1753            .unwrap();
1754
1755        assert_eq!(&written.slice(0, 3), &read[0]);
1756        assert_eq!(&written.slice(3, 3), &read[1]);
1757        assert_eq!(&written.slice(6, 2), &read[2]);
1758    }
1759
1760    #[test]
1761    fn test_decimal_list() {
1762        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1763
1764        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1765        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1766            decimals.data_type().clone(),
1767            false,
1768        ))))
1769        .len(7)
1770        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1771        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1772        .child_data(vec![decimals.into_data()])
1773        .build()
1774        .unwrap();
1775
1776        let written =
1777            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1778                .unwrap();
1779
1780        let mut buffer = Vec::with_capacity(1024);
1781        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1782        writer.write(&written).unwrap();
1783        writer.close().unwrap();
1784
1785        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1786            .unwrap()
1787            .collect::<Result<Vec<_>, _>>()
1788            .unwrap();
1789
1790        assert_eq!(&written.slice(0, 3), &read[0]);
1791        assert_eq!(&written.slice(3, 3), &read[1]);
1792        assert_eq!(&written.slice(6, 1), &read[2]);
1793    }
1794
1795    #[test]
1796    fn test_read_decimal_file() {
1797        use arrow_array::Decimal128Array;
1798        let testdata = arrow::util::test_util::parquet_test_data();
1799        let file_variants = vec![
1800            ("byte_array", 4),
1801            ("fixed_length", 25),
1802            ("int32", 4),
1803            ("int64", 10),
1804        ];
1805        for (prefix, target_precision) in file_variants {
1806            let path = format!("{testdata}/{prefix}_decimal.parquet");
1807            let file = File::open(path).unwrap();
1808            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1809
1810            let batch = record_reader.next().unwrap().unwrap();
1811            assert_eq!(batch.num_rows(), 24);
1812            let col = batch
1813                .column(0)
1814                .as_any()
1815                .downcast_ref::<Decimal128Array>()
1816                .unwrap();
1817
1818            let expected = 1..25;
1819
1820            assert_eq!(col.precision(), target_precision);
1821            assert_eq!(col.scale(), 2);
1822
1823            for (i, v) in expected.enumerate() {
1824                assert_eq!(col.value(i), v * 100_i128);
1825            }
1826        }
1827    }
1828
1829    #[test]
1830    fn test_read_float16_nonzeros_file() {
1831        use arrow_array::Float16Array;
1832        let testdata = arrow::util::test_util::parquet_test_data();
1833        // see https://github.com/apache/parquet-testing/pull/40
1834        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1835        let file = File::open(path).unwrap();
1836        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1837
1838        let batch = record_reader.next().unwrap().unwrap();
1839        assert_eq!(batch.num_rows(), 8);
1840        let col = batch
1841            .column(0)
1842            .as_any()
1843            .downcast_ref::<Float16Array>()
1844            .unwrap();
1845
1846        let f16_two = f16::ONE + f16::ONE;
1847
1848        assert_eq!(col.null_count(), 1);
1849        assert!(col.is_null(0));
1850        assert_eq!(col.value(1), f16::ONE);
1851        assert_eq!(col.value(2), -f16_two);
1852        assert!(col.value(3).is_nan());
1853        assert_eq!(col.value(4), f16::ZERO);
1854        assert!(col.value(4).is_sign_positive());
1855        assert_eq!(col.value(5), f16::NEG_ONE);
1856        assert_eq!(col.value(6), f16::NEG_ZERO);
1857        assert!(col.value(6).is_sign_negative());
1858        assert_eq!(col.value(7), f16_two);
1859    }
1860
1861    #[test]
1862    fn test_read_float16_zeros_file() {
1863        use arrow_array::Float16Array;
1864        let testdata = arrow::util::test_util::parquet_test_data();
1865        // see https://github.com/apache/parquet-testing/pull/40
1866        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1867        let file = File::open(path).unwrap();
1868        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1869
1870        let batch = record_reader.next().unwrap().unwrap();
1871        assert_eq!(batch.num_rows(), 3);
1872        let col = batch
1873            .column(0)
1874            .as_any()
1875            .downcast_ref::<Float16Array>()
1876            .unwrap();
1877
1878        assert_eq!(col.null_count(), 1);
1879        assert!(col.is_null(0));
1880        assert_eq!(col.value(1), f16::ZERO);
1881        assert!(col.value(1).is_sign_positive());
1882        assert!(col.value(2).is_nan());
1883    }
1884
1885    #[test]
1886    fn test_read_float32_float64_byte_stream_split() {
1887        let path = format!(
1888            "{}/byte_stream_split.zstd.parquet",
1889            arrow::util::test_util::parquet_test_data(),
1890        );
1891        let file = File::open(path).unwrap();
1892        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1893
1894        let mut row_count = 0;
1895        for batch in record_reader {
1896            let batch = batch.unwrap();
1897            row_count += batch.num_rows();
1898            let f32_col = batch.column(0).as_primitive::<Float32Type>();
1899            let f64_col = batch.column(1).as_primitive::<Float64Type>();
1900
1901            // This file contains floats from a standard normal distribution
1902            for &x in f32_col.values() {
1903                assert!(x > -10.0);
1904                assert!(x < 10.0);
1905            }
1906            for &x in f64_col.values() {
1907                assert!(x > -10.0);
1908                assert!(x < 10.0);
1909            }
1910        }
1911        assert_eq!(row_count, 300);
1912    }
1913
1914    #[test]
1915    fn test_read_extended_byte_stream_split() {
1916        let path = format!(
1917            "{}/byte_stream_split_extended.gzip.parquet",
1918            arrow::util::test_util::parquet_test_data(),
1919        );
1920        let file = File::open(path).unwrap();
1921        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1922
1923        let mut row_count = 0;
1924        for batch in record_reader {
1925            let batch = batch.unwrap();
1926            row_count += batch.num_rows();
1927
1928            // 0,1 are f16
1929            let f16_col = batch.column(0).as_primitive::<Float16Type>();
1930            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1931            assert_eq!(f16_col.len(), f16_bss.len());
1932            f16_col
1933                .iter()
1934                .zip(f16_bss.iter())
1935                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1936
1937            // 2,3 are f32
1938            let f32_col = batch.column(2).as_primitive::<Float32Type>();
1939            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1940            assert_eq!(f32_col.len(), f32_bss.len());
1941            f32_col
1942                .iter()
1943                .zip(f32_bss.iter())
1944                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1945
1946            // 4,5 are f64
1947            let f64_col = batch.column(4).as_primitive::<Float64Type>();
1948            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1949            assert_eq!(f64_col.len(), f64_bss.len());
1950            f64_col
1951                .iter()
1952                .zip(f64_bss.iter())
1953                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1954
1955            // 6,7 are i32
1956            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1957            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1958            assert_eq!(i32_col.len(), i32_bss.len());
1959            i32_col
1960                .iter()
1961                .zip(i32_bss.iter())
1962                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1963
1964            // 8,9 are i64
1965            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1966            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1967            assert_eq!(i64_col.len(), i64_bss.len());
1968            i64_col
1969                .iter()
1970                .zip(i64_bss.iter())
1971                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1972
1973            // 10,11 are FLBA(5)
1974            let flba_col = batch.column(10).as_fixed_size_binary();
1975            let flba_bss = batch.column(11).as_fixed_size_binary();
1976            assert_eq!(flba_col.len(), flba_bss.len());
1977            flba_col
1978                .iter()
1979                .zip(flba_bss.iter())
1980                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1981
1982            // 12,13 are FLBA(4) (decimal(7,3))
1983            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1984            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1985            assert_eq!(dec_col.len(), dec_bss.len());
1986            dec_col
1987                .iter()
1988                .zip(dec_bss.iter())
1989                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1990        }
1991        assert_eq!(row_count, 200);
1992    }
1993
1994    #[test]
1995    fn test_read_incorrect_map_schema_file() {
1996        let testdata = arrow::util::test_util::parquet_test_data();
1997        // see https://github.com/apache/parquet-testing/pull/47
1998        let path = format!("{testdata}/incorrect_map_schema.parquet");
1999        let file = File::open(path).unwrap();
2000        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2001
2002        let batch = record_reader.next().unwrap().unwrap();
2003        assert_eq!(batch.num_rows(), 1);
2004
2005        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2006            "my_map",
2007            ArrowDataType::Map(
2008                Arc::new(Field::new(
2009                    "key_value",
2010                    ArrowDataType::Struct(Fields::from(vec![
2011                        Field::new("key", ArrowDataType::Utf8, false),
2012                        Field::new("value", ArrowDataType::Utf8, true),
2013                    ])),
2014                    false,
2015                )),
2016                false,
2017            ),
2018            true,
2019        )]));
2020        assert_eq!(batch.schema().as_ref(), &expected_schema);
2021
2022        assert_eq!(batch.num_rows(), 1);
2023        assert_eq!(batch.column(0).null_count(), 0);
2024        assert_eq!(
2025            batch.column(0).as_map().keys().as_ref(),
2026            &StringArray::from(vec!["parent", "name"])
2027        );
2028        assert_eq!(
2029            batch.column(0).as_map().values().as_ref(),
2030            &StringArray::from(vec!["another", "report"])
2031        );
2032    }
2033
2034    /// Parameters for single_column_reader_test
2035    #[derive(Clone)]
2036    struct TestOptions {
2037        /// Number of row group to write to parquet (row group size =
2038        /// num_row_groups / num_rows)
2039        num_row_groups: usize,
2040        /// Total number of rows per row group
2041        num_rows: usize,
2042        /// Size of batches to read back
2043        record_batch_size: usize,
2044        /// Percentage of nulls in column or None if required
2045        null_percent: Option<usize>,
2046        /// Set write batch size
2047        ///
2048        /// This is the number of rows that are written at once to a page and
2049        /// therefore acts as a bound on the page granularity of a row group
2050        write_batch_size: usize,
2051        /// Maximum size of page in bytes
2052        max_data_page_size: usize,
2053        /// Maximum size of dictionary page in bytes
2054        max_dict_page_size: usize,
2055        /// Writer version
2056        writer_version: WriterVersion,
2057        /// Enabled statistics
2058        enabled_statistics: EnabledStatistics,
2059        /// Encoding
2060        encoding: Encoding,
2061        /// row selections and total selected row count
2062        row_selections: Option<(RowSelection, usize)>,
2063        /// row filter
2064        row_filter: Option<Vec<bool>>,
2065        /// limit
2066        limit: Option<usize>,
2067        /// offset
2068        offset: Option<usize>,
2069    }
2070
2071    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2072    impl std::fmt::Debug for TestOptions {
2073        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2074            f.debug_struct("TestOptions")
2075                .field("num_row_groups", &self.num_row_groups)
2076                .field("num_rows", &self.num_rows)
2077                .field("record_batch_size", &self.record_batch_size)
2078                .field("null_percent", &self.null_percent)
2079                .field("write_batch_size", &self.write_batch_size)
2080                .field("max_data_page_size", &self.max_data_page_size)
2081                .field("max_dict_page_size", &self.max_dict_page_size)
2082                .field("writer_version", &self.writer_version)
2083                .field("enabled_statistics", &self.enabled_statistics)
2084                .field("encoding", &self.encoding)
2085                .field("row_selections", &self.row_selections.is_some())
2086                .field("row_filter", &self.row_filter.is_some())
2087                .field("limit", &self.limit)
2088                .field("offset", &self.offset)
2089                .finish()
2090        }
2091    }
2092
2093    impl Default for TestOptions {
2094        fn default() -> Self {
2095            Self {
2096                num_row_groups: 2,
2097                num_rows: 100,
2098                record_batch_size: 15,
2099                null_percent: None,
2100                write_batch_size: 64,
2101                max_data_page_size: 1024 * 1024,
2102                max_dict_page_size: 1024 * 1024,
2103                writer_version: WriterVersion::PARQUET_1_0,
2104                enabled_statistics: EnabledStatistics::Page,
2105                encoding: Encoding::PLAIN,
2106                row_selections: None,
2107                row_filter: None,
2108                limit: None,
2109                offset: None,
2110            }
2111        }
2112    }
2113
2114    impl TestOptions {
2115        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2116            Self {
2117                num_row_groups,
2118                num_rows,
2119                record_batch_size,
2120                ..Default::default()
2121            }
2122        }
2123
2124        fn with_null_percent(self, null_percent: usize) -> Self {
2125            Self {
2126                null_percent: Some(null_percent),
2127                ..self
2128            }
2129        }
2130
2131        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2132            Self {
2133                max_data_page_size,
2134                ..self
2135            }
2136        }
2137
2138        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2139            Self {
2140                max_dict_page_size,
2141                ..self
2142            }
2143        }
2144
2145        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2146            Self {
2147                enabled_statistics,
2148                ..self
2149            }
2150        }
2151
2152        fn with_row_selections(self) -> Self {
2153            assert!(self.row_filter.is_none(), "Must set row selection first");
2154
2155            let mut rng = rng();
2156            let step = rng.random_range(self.record_batch_size..self.num_rows);
2157            let row_selections = create_test_selection(
2158                step,
2159                self.num_row_groups * self.num_rows,
2160                rng.random::<bool>(),
2161            );
2162            Self {
2163                row_selections: Some(row_selections),
2164                ..self
2165            }
2166        }
2167
2168        fn with_row_filter(self) -> Self {
2169            let row_count = match &self.row_selections {
2170                Some((_, count)) => *count,
2171                None => self.num_row_groups * self.num_rows,
2172            };
2173
2174            let mut rng = rng();
2175            Self {
2176                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2177                ..self
2178            }
2179        }
2180
2181        fn with_limit(self, limit: usize) -> Self {
2182            Self {
2183                limit: Some(limit),
2184                ..self
2185            }
2186        }
2187
2188        fn with_offset(self, offset: usize) -> Self {
2189            Self {
2190                offset: Some(offset),
2191                ..self
2192            }
2193        }
2194
2195        fn writer_props(&self) -> WriterProperties {
2196            let builder = WriterProperties::builder()
2197                .set_data_page_size_limit(self.max_data_page_size)
2198                .set_write_batch_size(self.write_batch_size)
2199                .set_writer_version(self.writer_version)
2200                .set_statistics_enabled(self.enabled_statistics);
2201
2202            let builder = match self.encoding {
2203                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2204                    .set_dictionary_enabled(true)
2205                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2206                _ => builder
2207                    .set_dictionary_enabled(false)
2208                    .set_encoding(self.encoding),
2209            };
2210
2211            builder.build()
2212        }
2213    }
2214
2215    /// Create a parquet file and then read it using
2216    /// `ParquetFileArrowReader` using a standard set of parameters
2217    /// `opts`.
2218    ///
2219    /// `rand_max` represents the maximum size of value to pass to to
2220    /// value generator
2221    fn run_single_column_reader_tests<T, F, G>(
2222        rand_max: i32,
2223        converted_type: ConvertedType,
2224        arrow_type: Option<ArrowDataType>,
2225        converter: F,
2226        encodings: &[Encoding],
2227    ) where
2228        T: DataType,
2229        G: RandGen<T>,
2230        F: Fn(&[Option<T::T>]) -> ArrayRef,
2231    {
2232        let all_options = vec![
2233            // choose record_batch_batch (15) so batches cross row
2234            // group boundaries (50 rows in 2 row groups) cases.
2235            TestOptions::new(2, 100, 15),
2236            // choose record_batch_batch (5) so batches sometime fall
2237            // on row group boundaries and (25 rows in 3 row groups
2238            // --> row groups of 10, 10, and 5). Tests buffer
2239            // refilling edge cases.
2240            TestOptions::new(3, 25, 5),
2241            // Choose record_batch_size (25) so all batches fall
2242            // exactly on row group boundary (25). Tests buffer
2243            // refilling edge cases.
2244            TestOptions::new(4, 100, 25),
2245            // Set maximum page size so row groups have multiple pages
2246            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2247            // Set small dictionary page size to test dictionary fallback
2248            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2249            // Test optional but with no nulls
2250            TestOptions::new(2, 256, 127).with_null_percent(0),
2251            // Test optional with nulls
2252            TestOptions::new(2, 256, 93).with_null_percent(25),
2253            // Test with limit of 0
2254            TestOptions::new(4, 100, 25).with_limit(0),
2255            // Test with limit of 50
2256            TestOptions::new(4, 100, 25).with_limit(50),
2257            // Test with limit equal to number of rows
2258            TestOptions::new(4, 100, 25).with_limit(10),
2259            // Test with limit larger than number of rows
2260            TestOptions::new(4, 100, 25).with_limit(101),
2261            // Test with limit + offset equal to number of rows
2262            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2263            // Test with limit + offset equal to number of rows
2264            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2265            // Test with limit + offset larger than number of rows
2266            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2267            // Test with no page-level statistics
2268            TestOptions::new(2, 256, 91)
2269                .with_null_percent(25)
2270                .with_enabled_statistics(EnabledStatistics::Chunk),
2271            // Test with no statistics
2272            TestOptions::new(2, 256, 91)
2273                .with_null_percent(25)
2274                .with_enabled_statistics(EnabledStatistics::None),
2275            // Test with all null
2276            TestOptions::new(2, 128, 91)
2277                .with_null_percent(100)
2278                .with_enabled_statistics(EnabledStatistics::None),
2279            // Test skip
2280
2281            // choose record_batch_batch (15) so batches cross row
2282            // group boundaries (50 rows in 2 row groups) cases.
2283            TestOptions::new(2, 100, 15).with_row_selections(),
2284            // choose record_batch_batch (5) so batches sometime fall
2285            // on row group boundaries and (25 rows in 3 row groups
2286            // --> row groups of 10, 10, and 5). Tests buffer
2287            // refilling edge cases.
2288            TestOptions::new(3, 25, 5).with_row_selections(),
2289            // Choose record_batch_size (25) so all batches fall
2290            // exactly on row group boundary (25). Tests buffer
2291            // refilling edge cases.
2292            TestOptions::new(4, 100, 25).with_row_selections(),
2293            // Set maximum page size so row groups have multiple pages
2294            TestOptions::new(3, 256, 73)
2295                .with_max_data_page_size(128)
2296                .with_row_selections(),
2297            // Set small dictionary page size to test dictionary fallback
2298            TestOptions::new(3, 256, 57)
2299                .with_max_dict_page_size(128)
2300                .with_row_selections(),
2301            // Test optional but with no nulls
2302            TestOptions::new(2, 256, 127)
2303                .with_null_percent(0)
2304                .with_row_selections(),
2305            // Test optional with nulls
2306            TestOptions::new(2, 256, 93)
2307                .with_null_percent(25)
2308                .with_row_selections(),
2309            // Test optional with nulls
2310            TestOptions::new(2, 256, 93)
2311                .with_null_percent(25)
2312                .with_row_selections()
2313                .with_limit(10),
2314            // Test optional with nulls
2315            TestOptions::new(2, 256, 93)
2316                .with_null_percent(25)
2317                .with_row_selections()
2318                .with_offset(20)
2319                .with_limit(10),
2320            // Test filter
2321
2322            // Test with row filter
2323            TestOptions::new(4, 100, 25).with_row_filter(),
2324            // Test with row selection and row filter
2325            TestOptions::new(4, 100, 25)
2326                .with_row_selections()
2327                .with_row_filter(),
2328            // Test with nulls and row filter
2329            TestOptions::new(2, 256, 93)
2330                .with_null_percent(25)
2331                .with_max_data_page_size(10)
2332                .with_row_filter(),
2333            // Test with nulls and row filter and small pages
2334            TestOptions::new(2, 256, 93)
2335                .with_null_percent(25)
2336                .with_max_data_page_size(10)
2337                .with_row_selections()
2338                .with_row_filter(),
2339            // Test with row selection and no offset index and small pages
2340            TestOptions::new(2, 256, 93)
2341                .with_enabled_statistics(EnabledStatistics::None)
2342                .with_max_data_page_size(10)
2343                .with_row_selections(),
2344        ];
2345
2346        all_options.into_iter().for_each(|opts| {
2347            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2348                for encoding in encodings {
2349                    let opts = TestOptions {
2350                        writer_version,
2351                        encoding: *encoding,
2352                        ..opts.clone()
2353                    };
2354
2355                    single_column_reader_test::<T, _, G>(
2356                        opts,
2357                        rand_max,
2358                        converted_type,
2359                        arrow_type.clone(),
2360                        &converter,
2361                    )
2362                }
2363            }
2364        });
2365    }
2366
2367    /// Create a parquet file and then read it using
2368    /// `ParquetFileArrowReader` using the parameters described in
2369    /// `opts`.
2370    fn single_column_reader_test<T, F, G>(
2371        opts: TestOptions,
2372        rand_max: i32,
2373        converted_type: ConvertedType,
2374        arrow_type: Option<ArrowDataType>,
2375        converter: F,
2376    ) where
2377        T: DataType,
2378        G: RandGen<T>,
2379        F: Fn(&[Option<T::T>]) -> ArrayRef,
2380    {
2381        // Print out options to facilitate debugging failures on CI
2382        println!(
2383            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2384            T::get_physical_type(), converted_type, arrow_type, opts
2385        );
2386
2387        //according to null_percent generate def_levels
2388        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2389            Some(null_percent) => {
2390                let mut rng = rng();
2391
2392                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2393                    .map(|_| {
2394                        std::iter::from_fn(|| {
2395                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2396                        })
2397                        .take(opts.num_rows)
2398                        .collect()
2399                    })
2400                    .collect();
2401                (Repetition::OPTIONAL, Some(def_levels))
2402            }
2403            None => (Repetition::REQUIRED, None),
2404        };
2405
2406        //generate random table data
2407        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2408            .map(|idx| {
2409                let null_count = match def_levels.as_ref() {
2410                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2411                    None => 0,
2412                };
2413                G::gen_vec(rand_max, opts.num_rows - null_count)
2414            })
2415            .collect();
2416
2417        let len = match T::get_physical_type() {
2418            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2419            crate::basic::Type::INT96 => 12,
2420            _ => -1,
2421        };
2422
2423        let fields = vec![Arc::new(
2424            Type::primitive_type_builder("leaf", T::get_physical_type())
2425                .with_repetition(repetition)
2426                .with_converted_type(converted_type)
2427                .with_length(len)
2428                .build()
2429                .unwrap(),
2430        )];
2431
2432        let schema = Arc::new(
2433            Type::group_type_builder("test_schema")
2434                .with_fields(fields)
2435                .build()
2436                .unwrap(),
2437        );
2438
2439        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2440
2441        let mut file = tempfile::tempfile().unwrap();
2442
2443        generate_single_column_file_with_data::<T>(
2444            &values,
2445            def_levels.as_ref(),
2446            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2447            schema,
2448            arrow_field,
2449            &opts,
2450        )
2451        .unwrap();
2452
2453        file.rewind().unwrap();
2454
2455        let options = ArrowReaderOptions::new()
2456            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2457
2458        let mut builder =
2459            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2460
2461        let expected_data = match opts.row_selections {
2462            Some((selections, row_count)) => {
2463                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2464
2465                let mut skip_data: Vec<Option<T::T>> = vec![];
2466                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2467                for select in dequeue {
2468                    if select.skip {
2469                        without_skip_data.drain(0..select.row_count);
2470                    } else {
2471                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2472                    }
2473                }
2474                builder = builder.with_row_selection(selections);
2475
2476                assert_eq!(skip_data.len(), row_count);
2477                skip_data
2478            }
2479            None => {
2480                //get flatten table data
2481                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2482                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2483                expected_data
2484            }
2485        };
2486
2487        let mut expected_data = match opts.row_filter {
2488            Some(filter) => {
2489                let expected_data = expected_data
2490                    .into_iter()
2491                    .zip(filter.iter())
2492                    .filter_map(|(d, f)| f.then(|| d))
2493                    .collect();
2494
2495                let mut filter_offset = 0;
2496                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2497                    ProjectionMask::all(),
2498                    move |b| {
2499                        let array = BooleanArray::from_iter(
2500                            filter
2501                                .iter()
2502                                .skip(filter_offset)
2503                                .take(b.num_rows())
2504                                .map(|x| Some(*x)),
2505                        );
2506                        filter_offset += b.num_rows();
2507                        Ok(array)
2508                    },
2509                ))]);
2510
2511                builder = builder.with_row_filter(filter);
2512                expected_data
2513            }
2514            None => expected_data,
2515        };
2516
2517        if let Some(offset) = opts.offset {
2518            builder = builder.with_offset(offset);
2519            expected_data = expected_data.into_iter().skip(offset).collect();
2520        }
2521
2522        if let Some(limit) = opts.limit {
2523            builder = builder.with_limit(limit);
2524            expected_data = expected_data.into_iter().take(limit).collect();
2525        }
2526
2527        let mut record_reader = builder
2528            .with_batch_size(opts.record_batch_size)
2529            .build()
2530            .unwrap();
2531
2532        let mut total_read = 0;
2533        loop {
2534            let maybe_batch = record_reader.next();
2535            if total_read < expected_data.len() {
2536                let end = min(total_read + opts.record_batch_size, expected_data.len());
2537                let batch = maybe_batch.unwrap().unwrap();
2538                assert_eq!(end - total_read, batch.num_rows());
2539
2540                let a = converter(&expected_data[total_read..end]);
2541                let b = Arc::clone(batch.column(0));
2542
2543                assert_eq!(a.data_type(), b.data_type());
2544                assert_eq!(a.to_data(), b.to_data());
2545                assert_eq!(
2546                    a.as_any().type_id(),
2547                    b.as_any().type_id(),
2548                    "incorrect type ids"
2549                );
2550
2551                total_read = end;
2552            } else {
2553                assert!(maybe_batch.is_none());
2554                break;
2555            }
2556        }
2557    }
2558
2559    fn gen_expected_data<T: DataType>(
2560        def_levels: Option<&Vec<Vec<i16>>>,
2561        values: &[Vec<T::T>],
2562    ) -> Vec<Option<T::T>> {
2563        let data: Vec<Option<T::T>> = match def_levels {
2564            Some(levels) => {
2565                let mut values_iter = values.iter().flatten();
2566                levels
2567                    .iter()
2568                    .flatten()
2569                    .map(|d| match d {
2570                        1 => Some(values_iter.next().cloned().unwrap()),
2571                        0 => None,
2572                        _ => unreachable!(),
2573                    })
2574                    .collect()
2575            }
2576            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2577        };
2578        data
2579    }
2580
2581    fn generate_single_column_file_with_data<T: DataType>(
2582        values: &[Vec<T::T>],
2583        def_levels: Option<&Vec<Vec<i16>>>,
2584        file: File,
2585        schema: TypePtr,
2586        field: Option<Field>,
2587        opts: &TestOptions,
2588    ) -> Result<crate::format::FileMetaData> {
2589        let mut writer_props = opts.writer_props();
2590        if let Some(field) = field {
2591            let arrow_schema = Schema::new(vec![field]);
2592            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2593        }
2594
2595        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2596
2597        for (idx, v) in values.iter().enumerate() {
2598            let def_levels = def_levels.map(|d| d[idx].as_slice());
2599            let mut row_group_writer = writer.next_row_group()?;
2600            {
2601                let mut column_writer = row_group_writer
2602                    .next_column()?
2603                    .expect("Column writer is none!");
2604
2605                column_writer
2606                    .typed::<T>()
2607                    .write_batch(v, def_levels, None)?;
2608
2609                column_writer.close()?;
2610            }
2611            row_group_writer.close()?;
2612        }
2613
2614        writer.close()
2615    }
2616
2617    fn get_test_file(file_name: &str) -> File {
2618        let mut path = PathBuf::new();
2619        path.push(arrow::util::test_util::arrow_test_data());
2620        path.push(file_name);
2621
2622        File::open(path.as_path()).expect("File not found!")
2623    }
2624
2625    #[test]
2626    fn test_read_structs() {
2627        // This particular test file has columns of struct types where there is
2628        // a column that has the same name as one of the struct fields
2629        // (see: ARROW-11452)
2630        let testdata = arrow::util::test_util::parquet_test_data();
2631        let path = format!("{testdata}/nested_structs.rust.parquet");
2632        let file = File::open(&path).unwrap();
2633        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2634
2635        for batch in record_batch_reader {
2636            batch.unwrap();
2637        }
2638
2639        let file = File::open(&path).unwrap();
2640        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2641
2642        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2643        let projected_reader = builder
2644            .with_projection(mask)
2645            .with_batch_size(60)
2646            .build()
2647            .unwrap();
2648
2649        let expected_schema = Schema::new(vec![
2650            Field::new(
2651                "roll_num",
2652                ArrowDataType::Struct(Fields::from(vec![Field::new(
2653                    "count",
2654                    ArrowDataType::UInt64,
2655                    false,
2656                )])),
2657                false,
2658            ),
2659            Field::new(
2660                "PC_CUR",
2661                ArrowDataType::Struct(Fields::from(vec![
2662                    Field::new("mean", ArrowDataType::Int64, false),
2663                    Field::new("sum", ArrowDataType::Int64, false),
2664                ])),
2665                false,
2666            ),
2667        ]);
2668
2669        // Tests for #1652 and #1654
2670        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2671
2672        for batch in projected_reader {
2673            let batch = batch.unwrap();
2674            assert_eq!(batch.schema().as_ref(), &expected_schema);
2675        }
2676    }
2677
2678    #[test]
2679    // same as test_read_structs but constructs projection mask via column names
2680    fn test_read_structs_by_name() {
2681        let testdata = arrow::util::test_util::parquet_test_data();
2682        let path = format!("{testdata}/nested_structs.rust.parquet");
2683        let file = File::open(&path).unwrap();
2684        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2685
2686        for batch in record_batch_reader {
2687            batch.unwrap();
2688        }
2689
2690        let file = File::open(&path).unwrap();
2691        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2692
2693        let mask = ProjectionMask::columns(
2694            builder.parquet_schema(),
2695            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2696        );
2697        let projected_reader = builder
2698            .with_projection(mask)
2699            .with_batch_size(60)
2700            .build()
2701            .unwrap();
2702
2703        let expected_schema = Schema::new(vec![
2704            Field::new(
2705                "roll_num",
2706                ArrowDataType::Struct(Fields::from(vec![Field::new(
2707                    "count",
2708                    ArrowDataType::UInt64,
2709                    false,
2710                )])),
2711                false,
2712            ),
2713            Field::new(
2714                "PC_CUR",
2715                ArrowDataType::Struct(Fields::from(vec![
2716                    Field::new("mean", ArrowDataType::Int64, false),
2717                    Field::new("sum", ArrowDataType::Int64, false),
2718                ])),
2719                false,
2720            ),
2721        ]);
2722
2723        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2724
2725        for batch in projected_reader {
2726            let batch = batch.unwrap();
2727            assert_eq!(batch.schema().as_ref(), &expected_schema);
2728        }
2729    }
2730
2731    #[test]
2732    fn test_read_maps() {
2733        let testdata = arrow::util::test_util::parquet_test_data();
2734        let path = format!("{testdata}/nested_maps.snappy.parquet");
2735        let file = File::open(path).unwrap();
2736        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2737
2738        for batch in record_batch_reader {
2739            batch.unwrap();
2740        }
2741    }
2742
2743    #[test]
2744    fn test_nested_nullability() {
2745        let message_type = "message nested {
2746          OPTIONAL Group group {
2747            REQUIRED INT32 leaf;
2748          }
2749        }";
2750
2751        let file = tempfile::tempfile().unwrap();
2752        let schema = Arc::new(parse_message_type(message_type).unwrap());
2753
2754        {
2755            // Write using low-level parquet API (#1167)
2756            let mut writer =
2757                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2758                    .unwrap();
2759
2760            {
2761                let mut row_group_writer = writer.next_row_group().unwrap();
2762                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2763
2764                column_writer
2765                    .typed::<Int32Type>()
2766                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2767                    .unwrap();
2768
2769                column_writer.close().unwrap();
2770                row_group_writer.close().unwrap();
2771            }
2772
2773            writer.close().unwrap();
2774        }
2775
2776        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2777        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2778
2779        let reader = builder.with_projection(mask).build().unwrap();
2780
2781        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2782            "group",
2783            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2784            true,
2785        )]));
2786
2787        let batch = reader.into_iter().next().unwrap().unwrap();
2788        assert_eq!(batch.schema().as_ref(), &expected_schema);
2789        assert_eq!(batch.num_rows(), 4);
2790        assert_eq!(batch.column(0).null_count(), 2);
2791    }
2792
2793    #[test]
2794    fn test_invalid_utf8() {
2795        // a parquet file with 1 column with invalid utf8
2796        let data = vec![
2797            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2798            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2799            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2800            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2801            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2802            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2803            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2804            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2805            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2806            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2807            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2808            82, 49,
2809        ];
2810
2811        let file = Bytes::from(data);
2812        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2813
2814        let error = record_batch_reader.next().unwrap().unwrap_err();
2815
2816        assert!(
2817            error.to_string().contains("invalid utf-8 sequence"),
2818            "{}",
2819            error
2820        );
2821    }
2822
2823    #[test]
2824    fn test_invalid_utf8_string_array() {
2825        test_invalid_utf8_string_array_inner::<i32>();
2826    }
2827
2828    #[test]
2829    fn test_invalid_utf8_large_string_array() {
2830        test_invalid_utf8_string_array_inner::<i64>();
2831    }
2832
2833    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2834        let cases = [
2835            invalid_utf8_first_char::<O>(),
2836            invalid_utf8_first_char_long_strings::<O>(),
2837            invalid_utf8_later_char::<O>(),
2838            invalid_utf8_later_char_long_strings::<O>(),
2839            invalid_utf8_later_char_really_long_strings::<O>(),
2840            invalid_utf8_later_char_really_long_strings2::<O>(),
2841        ];
2842        for array in &cases {
2843            for encoding in STRING_ENCODINGS {
2844                // data is not valid utf8 we can not construct a correct StringArray
2845                // safely, so purposely create an invalid StringArray
2846                let array = unsafe {
2847                    GenericStringArray::<O>::new_unchecked(
2848                        array.offsets().clone(),
2849                        array.values().clone(),
2850                        array.nulls().cloned(),
2851                    )
2852                };
2853                let data_type = array.data_type().clone();
2854                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2855                let err = read_from_parquet(data).unwrap_err();
2856                let expected_err =
2857                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2858                assert!(
2859                    err.to_string().contains(expected_err),
2860                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2861                );
2862            }
2863        }
2864    }
2865
2866    #[test]
2867    fn test_invalid_utf8_string_view_array() {
2868        let cases = [
2869            invalid_utf8_first_char::<i32>(),
2870            invalid_utf8_first_char_long_strings::<i32>(),
2871            invalid_utf8_later_char::<i32>(),
2872            invalid_utf8_later_char_long_strings::<i32>(),
2873            invalid_utf8_later_char_really_long_strings::<i32>(),
2874            invalid_utf8_later_char_really_long_strings2::<i32>(),
2875        ];
2876
2877        for encoding in STRING_ENCODINGS {
2878            for array in &cases {
2879                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2880                let array = array.as_binary_view();
2881
2882                // data is not valid utf8 we can not construct a correct StringArray
2883                // safely, so purposely create an invalid StringViewArray
2884                let array = unsafe {
2885                    StringViewArray::new_unchecked(
2886                        array.views().clone(),
2887                        array.data_buffers().to_vec(),
2888                        array.nulls().cloned(),
2889                    )
2890                };
2891
2892                let data_type = array.data_type().clone();
2893                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2894                let err = read_from_parquet(data).unwrap_err();
2895                let expected_err =
2896                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2897                assert!(
2898                    err.to_string().contains(expected_err),
2899                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2900                );
2901            }
2902        }
2903    }
2904
2905    /// Encodings suitable for string data
2906    const STRING_ENCODINGS: &[Option<Encoding>] = &[
2907        None,
2908        Some(Encoding::PLAIN),
2909        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2910        Some(Encoding::DELTA_BYTE_ARRAY),
2911    ];
2912
2913    /// Invalid Utf-8 sequence in the first character
2914    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2915    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2916
2917    /// Invalid Utf=8 sequence in NOT the first character
2918    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2919    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2920
2921    /// returns a BinaryArray with invalid UTF8 data in the first character
2922    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2923        let valid: &[u8] = b"   ";
2924        let invalid = INVALID_UTF8_FIRST_CHAR;
2925        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2926    }
2927
2928    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
2929    /// string larger than 12 bytes which is handled specially when reading
2930    /// `ByteViewArray`s
2931    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2932        let valid: &[u8] = b"   ";
2933        let mut invalid = vec![];
2934        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2935        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2936        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2937    }
2938
2939    /// returns a BinaryArray with invalid UTF8 data in a character other than
2940    /// the first (this is checked in a special codepath)
2941    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2942        let valid: &[u8] = b"   ";
2943        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2944        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2945    }
2946
2947    /// returns a BinaryArray with invalid UTF8 data in a character other than
2948    /// the first in a string larger than 12 bytes which is handled specially
2949    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2950    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2951        let valid: &[u8] = b"   ";
2952        let mut invalid = vec![];
2953        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2954        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2955        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2956    }
2957
2958    /// returns a BinaryArray with invalid UTF8 data in a character other than
2959    /// the first in a string larger than 128 bytes which is handled specially
2960    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2961    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2962        let valid: &[u8] = b"   ";
2963        let mut invalid = vec![];
2964        for _ in 0..10 {
2965            // each instance is 38 bytes
2966            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2967        }
2968        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2969        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2970    }
2971
2972    /// returns a BinaryArray with small invalid UTF8 data followed by a large
2973    /// invalid UTF8 data in a character other than the first in a string larger
2974    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2975        let valid: &[u8] = b"   ";
2976        let mut valid_long = vec![];
2977        for _ in 0..10 {
2978            // each instance is 38 bytes
2979            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2980        }
2981        let invalid = INVALID_UTF8_LATER_CHAR;
2982        GenericBinaryArray::<O>::from_iter(vec![
2983            None,
2984            Some(valid),
2985            Some(invalid),
2986            None,
2987            Some(&valid_long),
2988            Some(valid),
2989        ])
2990    }
2991
2992    /// writes the array into a single column parquet file with the specified
2993    /// encoding.
2994    ///
2995    /// If no encoding is specified, use default (dictionary) encoding
2996    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2997        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2998        let mut data = vec![];
2999        let schema = batch.schema();
3000        let props = encoding.map(|encoding| {
3001            WriterProperties::builder()
3002                // must disable dictionary encoding to actually use encoding
3003                .set_dictionary_enabled(false)
3004                .set_encoding(encoding)
3005                .build()
3006        });
3007
3008        {
3009            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3010            writer.write(&batch).unwrap();
3011            writer.flush().unwrap();
3012            writer.close().unwrap();
3013        };
3014        data
3015    }
3016
3017    /// read the parquet file into a record batch
3018    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3019        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3020            .unwrap()
3021            .build()
3022            .unwrap();
3023
3024        reader.collect()
3025    }
3026
3027    #[test]
3028    fn test_dictionary_preservation() {
3029        let fields = vec![Arc::new(
3030            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3031                .with_repetition(Repetition::OPTIONAL)
3032                .with_converted_type(ConvertedType::UTF8)
3033                .build()
3034                .unwrap(),
3035        )];
3036
3037        let schema = Arc::new(
3038            Type::group_type_builder("test_schema")
3039                .with_fields(fields)
3040                .build()
3041                .unwrap(),
3042        );
3043
3044        let dict_type = ArrowDataType::Dictionary(
3045            Box::new(ArrowDataType::Int32),
3046            Box::new(ArrowDataType::Utf8),
3047        );
3048
3049        let arrow_field = Field::new("leaf", dict_type, true);
3050
3051        let mut file = tempfile::tempfile().unwrap();
3052
3053        let values = vec![
3054            vec![
3055                ByteArray::from("hello"),
3056                ByteArray::from("a"),
3057                ByteArray::from("b"),
3058                ByteArray::from("d"),
3059            ],
3060            vec![
3061                ByteArray::from("c"),
3062                ByteArray::from("a"),
3063                ByteArray::from("b"),
3064            ],
3065        ];
3066
3067        let def_levels = vec![
3068            vec![1, 0, 0, 1, 0, 0, 1, 1],
3069            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3070        ];
3071
3072        let opts = TestOptions {
3073            encoding: Encoding::RLE_DICTIONARY,
3074            ..Default::default()
3075        };
3076
3077        generate_single_column_file_with_data::<ByteArrayType>(
3078            &values,
3079            Some(&def_levels),
3080            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3081            schema,
3082            Some(arrow_field),
3083            &opts,
3084        )
3085        .unwrap();
3086
3087        file.rewind().unwrap();
3088
3089        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3090
3091        let batches = record_reader
3092            .collect::<Result<Vec<RecordBatch>, _>>()
3093            .unwrap();
3094
3095        assert_eq!(batches.len(), 6);
3096        assert!(batches.iter().all(|x| x.num_columns() == 1));
3097
3098        let row_counts = batches
3099            .iter()
3100            .map(|x| (x.num_rows(), x.column(0).null_count()))
3101            .collect::<Vec<_>>();
3102
3103        assert_eq!(
3104            row_counts,
3105            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3106        );
3107
3108        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3109
3110        // First and second batch in same row group -> same dictionary
3111        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3112        // Third batch spans row group -> computed dictionary
3113        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3114        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3115        // Fourth, fifth and sixth from same row group -> same dictionary
3116        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3117        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3118    }
3119
3120    #[test]
3121    fn test_read_null_list() {
3122        let testdata = arrow::util::test_util::parquet_test_data();
3123        let path = format!("{testdata}/null_list.parquet");
3124        let file = File::open(path).unwrap();
3125        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3126
3127        let batch = record_batch_reader.next().unwrap().unwrap();
3128        assert_eq!(batch.num_rows(), 1);
3129        assert_eq!(batch.num_columns(), 1);
3130        assert_eq!(batch.column(0).len(), 1);
3131
3132        let list = batch
3133            .column(0)
3134            .as_any()
3135            .downcast_ref::<ListArray>()
3136            .unwrap();
3137        assert_eq!(list.len(), 1);
3138        assert!(list.is_valid(0));
3139
3140        let val = list.value(0);
3141        assert_eq!(val.len(), 0);
3142    }
3143
3144    #[test]
3145    fn test_null_schema_inference() {
3146        let testdata = arrow::util::test_util::parquet_test_data();
3147        let path = format!("{testdata}/null_list.parquet");
3148        let file = File::open(path).unwrap();
3149
3150        let arrow_field = Field::new(
3151            "emptylist",
3152            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3153            true,
3154        );
3155
3156        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3157        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3158        let schema = builder.schema();
3159        assert_eq!(schema.fields().len(), 1);
3160        assert_eq!(schema.field(0), &arrow_field);
3161    }
3162
3163    #[test]
3164    fn test_skip_metadata() {
3165        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3166        let field = Field::new("col", col.data_type().clone(), true);
3167
3168        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3169
3170        let metadata = [("key".to_string(), "value".to_string())]
3171            .into_iter()
3172            .collect();
3173
3174        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3175
3176        assert_ne!(schema_with_metadata, schema_without_metadata);
3177
3178        let batch =
3179            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3180
3181        let file = |version: WriterVersion| {
3182            let props = WriterProperties::builder()
3183                .set_writer_version(version)
3184                .build();
3185
3186            let file = tempfile().unwrap();
3187            let mut writer =
3188                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3189                    .unwrap();
3190            writer.write(&batch).unwrap();
3191            writer.close().unwrap();
3192            file
3193        };
3194
3195        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3196
3197        let v1_reader = file(WriterVersion::PARQUET_1_0);
3198        let v2_reader = file(WriterVersion::PARQUET_2_0);
3199
3200        let arrow_reader =
3201            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3202        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3203
3204        let reader =
3205            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3206                .unwrap()
3207                .build()
3208                .unwrap();
3209        assert_eq!(reader.schema(), schema_without_metadata);
3210
3211        let arrow_reader =
3212            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3213        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3214
3215        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3216            .unwrap()
3217            .build()
3218            .unwrap();
3219        assert_eq!(reader.schema(), schema_without_metadata);
3220    }
3221
3222    fn write_parquet_from_iter<I, F>(value: I) -> File
3223    where
3224        I: IntoIterator<Item = (F, ArrayRef)>,
3225        F: AsRef<str>,
3226    {
3227        let batch = RecordBatch::try_from_iter(value).unwrap();
3228        let file = tempfile().unwrap();
3229        let mut writer =
3230            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3231        writer.write(&batch).unwrap();
3232        writer.close().unwrap();
3233        file
3234    }
3235
3236    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3237    where
3238        I: IntoIterator<Item = (F, ArrayRef)>,
3239        F: AsRef<str>,
3240    {
3241        let file = write_parquet_from_iter(value);
3242        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3243        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3244            file.try_clone().unwrap(),
3245            options_with_schema,
3246        );
3247        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3248    }
3249
3250    #[test]
3251    fn test_schema_too_few_columns() {
3252        run_schema_test_with_error(
3253            vec![
3254                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3255                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3256            ],
3257            Arc::new(Schema::new(vec![Field::new(
3258                "int64",
3259                ArrowDataType::Int64,
3260                false,
3261            )])),
3262            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3263        );
3264    }
3265
3266    #[test]
3267    fn test_schema_too_many_columns() {
3268        run_schema_test_with_error(
3269            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3270            Arc::new(Schema::new(vec![
3271                Field::new("int64", ArrowDataType::Int64, false),
3272                Field::new("int32", ArrowDataType::Int32, false),
3273            ])),
3274            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3275        );
3276    }
3277
3278    #[test]
3279    fn test_schema_mismatched_column_names() {
3280        run_schema_test_with_error(
3281            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3282            Arc::new(Schema::new(vec![Field::new(
3283                "other",
3284                ArrowDataType::Int64,
3285                false,
3286            )])),
3287            "Arrow: incompatible arrow schema, expected field named int64 got other",
3288        );
3289    }
3290
3291    #[test]
3292    fn test_schema_incompatible_columns() {
3293        run_schema_test_with_error(
3294            vec![
3295                (
3296                    "col1_invalid",
3297                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3298                ),
3299                (
3300                    "col2_valid",
3301                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3302                ),
3303                (
3304                    "col3_invalid",
3305                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3306                ),
3307            ],
3308            Arc::new(Schema::new(vec![
3309                Field::new("col1_invalid", ArrowDataType::Int32, false),
3310                Field::new("col2_valid", ArrowDataType::Int32, false),
3311                Field::new("col3_invalid", ArrowDataType::Int32, false),
3312            ])),
3313            "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3314        );
3315    }
3316
3317    #[test]
3318    fn test_one_incompatible_nested_column() {
3319        let nested_fields = Fields::from(vec![
3320            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3321            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3322        ]);
3323        let nested = StructArray::try_new(
3324            nested_fields,
3325            vec![
3326                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3327                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3328            ],
3329            None,
3330        )
3331        .expect("struct array");
3332        let supplied_nested_fields = Fields::from(vec![
3333            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3334            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3335        ]);
3336        run_schema_test_with_error(
3337            vec![
3338                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3339                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3340                ("nested", Arc::new(nested) as ArrayRef),
3341            ],
3342            Arc::new(Schema::new(vec![
3343                Field::new("col1", ArrowDataType::Int64, false),
3344                Field::new("col2", ArrowDataType::Int32, false),
3345                Field::new(
3346                    "nested",
3347                    ArrowDataType::Struct(supplied_nested_fields),
3348                    false,
3349                ),
3350            ])),
3351            "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3352        );
3353    }
3354
3355    #[test]
3356    fn test_read_binary_as_utf8() {
3357        let file = write_parquet_from_iter(vec![
3358            (
3359                "binary_to_utf8",
3360                Arc::new(BinaryArray::from(vec![
3361                    b"one".as_ref(),
3362                    b"two".as_ref(),
3363                    b"three".as_ref(),
3364                ])) as ArrayRef,
3365            ),
3366            (
3367                "large_binary_to_large_utf8",
3368                Arc::new(LargeBinaryArray::from(vec![
3369                    b"one".as_ref(),
3370                    b"two".as_ref(),
3371                    b"three".as_ref(),
3372                ])) as ArrayRef,
3373            ),
3374            (
3375                "binary_view_to_utf8_view",
3376                Arc::new(BinaryViewArray::from(vec![
3377                    b"one".as_ref(),
3378                    b"two".as_ref(),
3379                    b"three".as_ref(),
3380                ])) as ArrayRef,
3381            ),
3382        ]);
3383        let supplied_fields = Fields::from(vec![
3384            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3385            Field::new(
3386                "large_binary_to_large_utf8",
3387                ArrowDataType::LargeUtf8,
3388                false,
3389            ),
3390            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3391        ]);
3392
3393        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3394        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3395            file.try_clone().unwrap(),
3396            options,
3397        )
3398        .expect("reader builder with schema")
3399        .build()
3400        .expect("reader with schema");
3401
3402        let batch = arrow_reader.next().unwrap().unwrap();
3403        assert_eq!(batch.num_columns(), 3);
3404        assert_eq!(batch.num_rows(), 3);
3405        assert_eq!(
3406            batch
3407                .column(0)
3408                .as_string::<i32>()
3409                .iter()
3410                .collect::<Vec<_>>(),
3411            vec![Some("one"), Some("two"), Some("three")]
3412        );
3413
3414        assert_eq!(
3415            batch
3416                .column(1)
3417                .as_string::<i64>()
3418                .iter()
3419                .collect::<Vec<_>>(),
3420            vec![Some("one"), Some("two"), Some("three")]
3421        );
3422
3423        assert_eq!(
3424            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3425            vec![Some("one"), Some("two"), Some("three")]
3426        );
3427    }
3428
3429    #[test]
3430    #[should_panic(expected = "Invalid UTF8 sequence at")]
3431    fn test_read_non_utf8_binary_as_utf8() {
3432        let file = write_parquet_from_iter(vec![(
3433            "non_utf8_binary",
3434            Arc::new(BinaryArray::from(vec![
3435                b"\xDE\x00\xFF".as_ref(),
3436                b"\xDE\x01\xAA".as_ref(),
3437                b"\xDE\x02\xFF".as_ref(),
3438            ])) as ArrayRef,
3439        )]);
3440        let supplied_fields = Fields::from(vec![Field::new(
3441            "non_utf8_binary",
3442            ArrowDataType::Utf8,
3443            false,
3444        )]);
3445
3446        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3447        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3448            file.try_clone().unwrap(),
3449            options,
3450        )
3451        .expect("reader builder with schema")
3452        .build()
3453        .expect("reader with schema");
3454        arrow_reader.next().unwrap().unwrap_err();
3455    }
3456
3457    #[test]
3458    fn test_with_schema() {
3459        let nested_fields = Fields::from(vec![
3460            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3461            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3462        ]);
3463
3464        let nested_arrays: Vec<ArrayRef> = vec![
3465            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3466            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3467        ];
3468
3469        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3470
3471        let file = write_parquet_from_iter(vec![
3472            (
3473                "int32_to_ts_second",
3474                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3475            ),
3476            (
3477                "date32_to_date64",
3478                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3479            ),
3480            ("nested", Arc::new(nested) as ArrayRef),
3481        ]);
3482
3483        let supplied_nested_fields = Fields::from(vec![
3484            Field::new(
3485                "utf8_to_dict",
3486                ArrowDataType::Dictionary(
3487                    Box::new(ArrowDataType::Int32),
3488                    Box::new(ArrowDataType::Utf8),
3489                ),
3490                false,
3491            ),
3492            Field::new(
3493                "int64_to_ts_nano",
3494                ArrowDataType::Timestamp(
3495                    arrow::datatypes::TimeUnit::Nanosecond,
3496                    Some("+10:00".into()),
3497                ),
3498                false,
3499            ),
3500        ]);
3501
3502        let supplied_schema = Arc::new(Schema::new(vec![
3503            Field::new(
3504                "int32_to_ts_second",
3505                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3506                false,
3507            ),
3508            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3509            Field::new(
3510                "nested",
3511                ArrowDataType::Struct(supplied_nested_fields),
3512                false,
3513            ),
3514        ]));
3515
3516        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3517        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3518            file.try_clone().unwrap(),
3519            options,
3520        )
3521        .expect("reader builder with schema")
3522        .build()
3523        .expect("reader with schema");
3524
3525        assert_eq!(arrow_reader.schema(), supplied_schema);
3526        let batch = arrow_reader.next().unwrap().unwrap();
3527        assert_eq!(batch.num_columns(), 3);
3528        assert_eq!(batch.num_rows(), 4);
3529        assert_eq!(
3530            batch
3531                .column(0)
3532                .as_any()
3533                .downcast_ref::<TimestampSecondArray>()
3534                .expect("downcast to timestamp second")
3535                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3536                .map(|v| v.to_string())
3537                .expect("value as datetime"),
3538            "1970-01-01 01:00:00 +01:00"
3539        );
3540        assert_eq!(
3541            batch
3542                .column(1)
3543                .as_any()
3544                .downcast_ref::<Date64Array>()
3545                .expect("downcast to date64")
3546                .value_as_date(0)
3547                .map(|v| v.to_string())
3548                .expect("value as date"),
3549            "1970-01-01"
3550        );
3551
3552        let nested = batch
3553            .column(2)
3554            .as_any()
3555            .downcast_ref::<StructArray>()
3556            .expect("downcast to struct");
3557
3558        let nested_dict = nested
3559            .column(0)
3560            .as_any()
3561            .downcast_ref::<Int32DictionaryArray>()
3562            .expect("downcast to dictionary");
3563
3564        assert_eq!(
3565            nested_dict
3566                .values()
3567                .as_any()
3568                .downcast_ref::<StringArray>()
3569                .expect("downcast to string")
3570                .iter()
3571                .collect::<Vec<_>>(),
3572            vec![Some("a"), Some("b")]
3573        );
3574
3575        assert_eq!(
3576            nested_dict.keys().iter().collect::<Vec<_>>(),
3577            vec![Some(0), Some(0), Some(0), Some(1)]
3578        );
3579
3580        assert_eq!(
3581            nested
3582                .column(1)
3583                .as_any()
3584                .downcast_ref::<TimestampNanosecondArray>()
3585                .expect("downcast to timestamp nanosecond")
3586                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3587                .map(|v| v.to_string())
3588                .expect("value as datetime"),
3589            "1970-01-01 10:00:00.000000001 +10:00"
3590        );
3591    }
3592
3593    #[test]
3594    fn test_empty_projection() {
3595        let testdata = arrow::util::test_util::parquet_test_data();
3596        let path = format!("{testdata}/alltypes_plain.parquet");
3597        let file = File::open(path).unwrap();
3598
3599        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3600        let file_metadata = builder.metadata().file_metadata();
3601        let expected_rows = file_metadata.num_rows() as usize;
3602
3603        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3604        let batch_reader = builder
3605            .with_projection(mask)
3606            .with_batch_size(2)
3607            .build()
3608            .unwrap();
3609
3610        let mut total_rows = 0;
3611        for maybe_batch in batch_reader {
3612            let batch = maybe_batch.unwrap();
3613            total_rows += batch.num_rows();
3614            assert_eq!(batch.num_columns(), 0);
3615            assert!(batch.num_rows() <= 2);
3616        }
3617
3618        assert_eq!(total_rows, expected_rows);
3619    }
3620
3621    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3622        let schema = Arc::new(Schema::new(vec![Field::new(
3623            "list",
3624            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3625            true,
3626        )]));
3627
3628        let mut buf = Vec::with_capacity(1024);
3629
3630        let mut writer = ArrowWriter::try_new(
3631            &mut buf,
3632            schema.clone(),
3633            Some(
3634                WriterProperties::builder()
3635                    .set_max_row_group_size(row_group_size)
3636                    .build(),
3637            ),
3638        )
3639        .unwrap();
3640        for _ in 0..2 {
3641            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3642            for _ in 0..(batch_size) {
3643                list_builder.append(true);
3644            }
3645            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3646                .unwrap();
3647            writer.write(&batch).unwrap();
3648        }
3649        writer.close().unwrap();
3650
3651        let mut record_reader =
3652            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3653        assert_eq!(
3654            batch_size,
3655            record_reader.next().unwrap().unwrap().num_rows()
3656        );
3657        assert_eq!(
3658            batch_size,
3659            record_reader.next().unwrap().unwrap().num_rows()
3660        );
3661    }
3662
3663    #[test]
3664    fn test_row_group_exact_multiple() {
3665        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3666        test_row_group_batch(8, 8);
3667        test_row_group_batch(10, 8);
3668        test_row_group_batch(8, 10);
3669        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3670        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3671        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3672        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3673        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3674    }
3675
3676    /// Given a RecordBatch containing all the column data, return the expected batches given
3677    /// a `batch_size` and `selection`
3678    fn get_expected_batches(
3679        column: &RecordBatch,
3680        selection: &RowSelection,
3681        batch_size: usize,
3682    ) -> Vec<RecordBatch> {
3683        let mut expected_batches = vec![];
3684
3685        let mut selection: VecDeque<_> = selection.clone().into();
3686        let mut row_offset = 0;
3687        let mut last_start = None;
3688        while row_offset < column.num_rows() && !selection.is_empty() {
3689            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3690            while batch_remaining > 0 && !selection.is_empty() {
3691                let (to_read, skip) = match selection.front_mut() {
3692                    Some(selection) if selection.row_count > batch_remaining => {
3693                        selection.row_count -= batch_remaining;
3694                        (batch_remaining, selection.skip)
3695                    }
3696                    Some(_) => {
3697                        let select = selection.pop_front().unwrap();
3698                        (select.row_count, select.skip)
3699                    }
3700                    None => break,
3701                };
3702
3703                batch_remaining -= to_read;
3704
3705                match skip {
3706                    true => {
3707                        if let Some(last_start) = last_start.take() {
3708                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3709                        }
3710                        row_offset += to_read
3711                    }
3712                    false => {
3713                        last_start.get_or_insert(row_offset);
3714                        row_offset += to_read
3715                    }
3716                }
3717            }
3718        }
3719
3720        if let Some(last_start) = last_start.take() {
3721            expected_batches.push(column.slice(last_start, row_offset - last_start))
3722        }
3723
3724        // Sanity check, all batches except the final should be the batch size
3725        for batch in &expected_batches[..expected_batches.len() - 1] {
3726            assert_eq!(batch.num_rows(), batch_size);
3727        }
3728
3729        expected_batches
3730    }
3731
3732    fn create_test_selection(
3733        step_len: usize,
3734        total_len: usize,
3735        skip_first: bool,
3736    ) -> (RowSelection, usize) {
3737        let mut remaining = total_len;
3738        let mut skip = skip_first;
3739        let mut vec = vec![];
3740        let mut selected_count = 0;
3741        while remaining != 0 {
3742            let step = if remaining > step_len {
3743                step_len
3744            } else {
3745                remaining
3746            };
3747            vec.push(RowSelector {
3748                row_count: step,
3749                skip,
3750            });
3751            remaining -= step;
3752            if !skip {
3753                selected_count += step;
3754            }
3755            skip = !skip;
3756        }
3757        (vec.into(), selected_count)
3758    }
3759
3760    #[test]
3761    fn test_scan_row_with_selection() {
3762        let testdata = arrow::util::test_util::parquet_test_data();
3763        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3764        let test_file = File::open(&path).unwrap();
3765
3766        let mut serial_reader =
3767            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3768        let data = serial_reader.next().unwrap().unwrap();
3769
3770        let do_test = |batch_size: usize, selection_len: usize| {
3771            for skip_first in [false, true] {
3772                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3773
3774                let expected = get_expected_batches(&data, &selections, batch_size);
3775                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3776                assert_eq!(
3777                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3778                    expected,
3779                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3780                );
3781            }
3782        };
3783
3784        // total row count 7300
3785        // 1. test selection len more than one page row count
3786        do_test(1000, 1000);
3787
3788        // 2. test selection len less than one page row count
3789        do_test(20, 20);
3790
3791        // 3. test selection_len less than batch_size
3792        do_test(20, 5);
3793
3794        // 4. test selection_len more than batch_size
3795        // If batch_size < selection_len
3796        do_test(20, 5);
3797
3798        fn create_skip_reader(
3799            test_file: &File,
3800            batch_size: usize,
3801            selections: RowSelection,
3802        ) -> ParquetRecordBatchReader {
3803            let options = ArrowReaderOptions::new().with_page_index(true);
3804            let file = test_file.try_clone().unwrap();
3805            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3806                .unwrap()
3807                .with_batch_size(batch_size)
3808                .with_row_selection(selections)
3809                .build()
3810                .unwrap()
3811        }
3812    }
3813
3814    #[test]
3815    fn test_batch_size_overallocate() {
3816        let testdata = arrow::util::test_util::parquet_test_data();
3817        // `alltypes_plain.parquet` only have 8 rows
3818        let path = format!("{testdata}/alltypes_plain.parquet");
3819        let test_file = File::open(path).unwrap();
3820
3821        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3822        let num_rows = builder.metadata.file_metadata().num_rows();
3823        let reader = builder
3824            .with_batch_size(1024)
3825            .with_projection(ProjectionMask::all())
3826            .build()
3827            .unwrap();
3828        assert_ne!(1024, num_rows);
3829        assert_eq!(reader.batch_size, num_rows as usize);
3830    }
3831
3832    #[test]
3833    fn test_read_with_page_index_enabled() {
3834        let testdata = arrow::util::test_util::parquet_test_data();
3835
3836        {
3837            // `alltypes_tiny_pages.parquet` has page index
3838            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3839            let test_file = File::open(path).unwrap();
3840            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3841                test_file,
3842                ArrowReaderOptions::new().with_page_index(true),
3843            )
3844            .unwrap();
3845            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3846            let reader = builder.build().unwrap();
3847            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3848            assert_eq!(batches.len(), 8);
3849        }
3850
3851        {
3852            // `alltypes_plain.parquet` doesn't have page index
3853            let path = format!("{testdata}/alltypes_plain.parquet");
3854            let test_file = File::open(path).unwrap();
3855            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3856                test_file,
3857                ArrowReaderOptions::new().with_page_index(true),
3858            )
3859            .unwrap();
3860            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
3861            // we should read the file successfully.
3862            assert!(builder.metadata().offset_index().is_none());
3863            let reader = builder.build().unwrap();
3864            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3865            assert_eq!(batches.len(), 1);
3866        }
3867    }
3868
3869    #[test]
3870    fn test_raw_repetition() {
3871        const MESSAGE_TYPE: &str = "
3872            message Log {
3873              OPTIONAL INT32 eventType;
3874              REPEATED INT32 category;
3875              REPEATED group filter {
3876                OPTIONAL INT32 error;
3877              }
3878            }
3879        ";
3880        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3881        let props = Default::default();
3882
3883        let mut buf = Vec::with_capacity(1024);
3884        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3885        let mut row_group_writer = writer.next_row_group().unwrap();
3886
3887        // column 0
3888        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3889        col_writer
3890            .typed::<Int32Type>()
3891            .write_batch(&[1], Some(&[1]), None)
3892            .unwrap();
3893        col_writer.close().unwrap();
3894        // column 1
3895        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3896        col_writer
3897            .typed::<Int32Type>()
3898            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3899            .unwrap();
3900        col_writer.close().unwrap();
3901        // column 2
3902        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3903        col_writer
3904            .typed::<Int32Type>()
3905            .write_batch(&[1], Some(&[1]), Some(&[0]))
3906            .unwrap();
3907        col_writer.close().unwrap();
3908
3909        let rg_md = row_group_writer.close().unwrap();
3910        assert_eq!(rg_md.num_rows(), 1);
3911        writer.close().unwrap();
3912
3913        let bytes = Bytes::from(buf);
3914
3915        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3916        let full = no_mask.next().unwrap().unwrap();
3917
3918        assert_eq!(full.num_columns(), 3);
3919
3920        for idx in 0..3 {
3921            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3922            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3923            let mut reader = b.with_projection(mask).build().unwrap();
3924            let projected = reader.next().unwrap().unwrap();
3925
3926            assert_eq!(projected.num_columns(), 1);
3927            assert_eq!(full.column(idx), projected.column(0));
3928        }
3929    }
3930
3931    #[test]
3932    fn test_read_lz4_raw() {
3933        let testdata = arrow::util::test_util::parquet_test_data();
3934        let path = format!("{testdata}/lz4_raw_compressed.parquet");
3935        let file = File::open(path).unwrap();
3936
3937        let batches = ParquetRecordBatchReader::try_new(file, 1024)
3938            .unwrap()
3939            .collect::<Result<Vec<_>, _>>()
3940            .unwrap();
3941        assert_eq!(batches.len(), 1);
3942        let batch = &batches[0];
3943
3944        assert_eq!(batch.num_columns(), 3);
3945        assert_eq!(batch.num_rows(), 4);
3946
3947        // https://github.com/apache/parquet-testing/pull/18
3948        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3949        assert_eq!(
3950            a.values(),
3951            &[1593604800, 1593604800, 1593604801, 1593604801]
3952        );
3953
3954        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3955        let a: Vec<_> = a.iter().flatten().collect();
3956        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3957
3958        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3959        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3960    }
3961
3962    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
3963    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
3964    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3965    //    LZ4_HADOOP algorithm for compression.
3966    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3967    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
3968    //    older parquet-cpp versions.
3969    //
3970    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
3971    #[test]
3972    fn test_read_lz4_hadoop_fallback() {
3973        for file in [
3974            "hadoop_lz4_compressed.parquet",
3975            "non_hadoop_lz4_compressed.parquet",
3976        ] {
3977            let testdata = arrow::util::test_util::parquet_test_data();
3978            let path = format!("{testdata}/{file}");
3979            let file = File::open(path).unwrap();
3980            let expected_rows = 4;
3981
3982            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3983                .unwrap()
3984                .collect::<Result<Vec<_>, _>>()
3985                .unwrap();
3986            assert_eq!(batches.len(), 1);
3987            let batch = &batches[0];
3988
3989            assert_eq!(batch.num_columns(), 3);
3990            assert_eq!(batch.num_rows(), expected_rows);
3991
3992            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3993            assert_eq!(
3994                a.values(),
3995                &[1593604800, 1593604800, 1593604801, 1593604801]
3996            );
3997
3998            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3999            let b: Vec<_> = b.iter().flatten().collect();
4000            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4001
4002            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4003            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4004        }
4005    }
4006
4007    #[test]
4008    fn test_read_lz4_hadoop_large() {
4009        let testdata = arrow::util::test_util::parquet_test_data();
4010        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4011        let file = File::open(path).unwrap();
4012        let expected_rows = 10000;
4013
4014        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4015            .unwrap()
4016            .collect::<Result<Vec<_>, _>>()
4017            .unwrap();
4018        assert_eq!(batches.len(), 1);
4019        let batch = &batches[0];
4020
4021        assert_eq!(batch.num_columns(), 1);
4022        assert_eq!(batch.num_rows(), expected_rows);
4023
4024        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4025        let a: Vec<_> = a.iter().flatten().collect();
4026        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4027        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4028        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4029        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4030    }
4031
4032    #[test]
4033    #[cfg(feature = "snap")]
4034    fn test_read_nested_lists() {
4035        let testdata = arrow::util::test_util::parquet_test_data();
4036        let path = format!("{testdata}/nested_lists.snappy.parquet");
4037        let file = File::open(path).unwrap();
4038
4039        let f = file.try_clone().unwrap();
4040        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4041        let expected = reader.next().unwrap().unwrap();
4042        assert_eq!(expected.num_rows(), 3);
4043
4044        let selection = RowSelection::from(vec![
4045            RowSelector::skip(1),
4046            RowSelector::select(1),
4047            RowSelector::skip(1),
4048        ]);
4049        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4050            .unwrap()
4051            .with_row_selection(selection)
4052            .build()
4053            .unwrap();
4054
4055        let actual = reader.next().unwrap().unwrap();
4056        assert_eq!(actual.num_rows(), 1);
4057        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4058    }
4059
4060    #[test]
4061    fn test_arbitrary_decimal() {
4062        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4063        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4064            .with_precision_and_scale(19, 0)
4065            .unwrap();
4066        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4067            .with_precision_and_scale(12, 0)
4068            .unwrap();
4069        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4070            .with_precision_and_scale(17, 10)
4071            .unwrap();
4072
4073        let written = RecordBatch::try_from_iter([
4074            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4075            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4076            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4077        ])
4078        .unwrap();
4079
4080        let mut buffer = Vec::with_capacity(1024);
4081        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4082        writer.write(&written).unwrap();
4083        writer.close().unwrap();
4084
4085        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4086            .unwrap()
4087            .collect::<Result<Vec<_>, _>>()
4088            .unwrap();
4089
4090        assert_eq!(&written.slice(0, 8), &read[0]);
4091    }
4092
4093    #[test]
4094    fn test_list_skip() {
4095        let mut list = ListBuilder::new(Int32Builder::new());
4096        list.append_value([Some(1), Some(2)]);
4097        list.append_value([Some(3)]);
4098        list.append_value([Some(4)]);
4099        let list = list.finish();
4100        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4101
4102        // First page contains 2 values but only 1 row
4103        let props = WriterProperties::builder()
4104            .set_data_page_row_count_limit(1)
4105            .set_write_batch_size(2)
4106            .build();
4107
4108        let mut buffer = Vec::with_capacity(1024);
4109        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4110        writer.write(&batch).unwrap();
4111        writer.close().unwrap();
4112
4113        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4114        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4115            .unwrap()
4116            .with_row_selection(selection.into())
4117            .build()
4118            .unwrap();
4119        let out = reader.next().unwrap().unwrap();
4120        assert_eq!(out.num_rows(), 1);
4121        assert_eq!(out, batch.slice(2, 1));
4122    }
4123
4124    fn test_decimal_roundtrip<T: DecimalType>() {
4125        // Precision <= 9 -> INT32
4126        // Precision <= 18 -> INT64
4127        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4128
4129        let d = |values: Vec<usize>, p: u8| {
4130            let iter = values.into_iter().map(T::Native::usize_as);
4131            PrimitiveArray::<T>::from_iter_values(iter)
4132                .with_precision_and_scale(p, 2)
4133                .unwrap()
4134        };
4135
4136        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4137        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4138        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4139        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4140
4141        let batch = RecordBatch::try_from_iter([
4142            ("d1", Arc::new(d1) as ArrayRef),
4143            ("d2", Arc::new(d2) as ArrayRef),
4144            ("d3", Arc::new(d3) as ArrayRef),
4145            ("d4", Arc::new(d4) as ArrayRef),
4146        ])
4147        .unwrap();
4148
4149        let mut buffer = Vec::with_capacity(1024);
4150        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4151        writer.write(&batch).unwrap();
4152        writer.close().unwrap();
4153
4154        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4155        let t1 = builder.parquet_schema().columns()[0].physical_type();
4156        assert_eq!(t1, PhysicalType::INT32);
4157        let t2 = builder.parquet_schema().columns()[1].physical_type();
4158        assert_eq!(t2, PhysicalType::INT64);
4159        let t3 = builder.parquet_schema().columns()[2].physical_type();
4160        assert_eq!(t3, PhysicalType::INT64);
4161        let t4 = builder.parquet_schema().columns()[3].physical_type();
4162        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4163
4164        let mut reader = builder.build().unwrap();
4165        assert_eq!(batch.schema(), reader.schema());
4166
4167        let out = reader.next().unwrap().unwrap();
4168        assert_eq!(batch, out);
4169    }
4170
4171    #[test]
4172    fn test_decimal() {
4173        test_decimal_roundtrip::<Decimal128Type>();
4174        test_decimal_roundtrip::<Decimal256Type>();
4175    }
4176
4177    #[test]
4178    fn test_list_selection() {
4179        let schema = Arc::new(Schema::new(vec![Field::new_list(
4180            "list",
4181            Field::new_list_field(ArrowDataType::Utf8, true),
4182            false,
4183        )]));
4184        let mut buf = Vec::with_capacity(1024);
4185
4186        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4187
4188        for i in 0..2 {
4189            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4190            for j in 0..1024 {
4191                list_a_builder.values().append_value(format!("{i} {j}"));
4192                list_a_builder.append(true);
4193            }
4194            let batch =
4195                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4196                    .unwrap();
4197            writer.write(&batch).unwrap();
4198        }
4199        let _metadata = writer.close().unwrap();
4200
4201        let buf = Bytes::from(buf);
4202        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4203            .unwrap()
4204            .with_row_selection(RowSelection::from(vec![
4205                RowSelector::skip(100),
4206                RowSelector::select(924),
4207                RowSelector::skip(100),
4208                RowSelector::select(924),
4209            ]))
4210            .build()
4211            .unwrap();
4212
4213        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4214        let batch = concat_batches(&schema, &batches).unwrap();
4215
4216        assert_eq!(batch.num_rows(), 924 * 2);
4217        let list = batch.column(0).as_list::<i32>();
4218
4219        for w in list.value_offsets().windows(2) {
4220            assert_eq!(w[0] + 1, w[1])
4221        }
4222        let mut values = list.values().as_string::<i32>().iter();
4223
4224        for i in 0..2 {
4225            for j in 100..1024 {
4226                let expected = format!("{i} {j}");
4227                assert_eq!(values.next().unwrap().unwrap(), &expected);
4228            }
4229        }
4230    }
4231
4232    #[test]
4233    fn test_list_selection_fuzz() {
4234        let mut rng = rng();
4235        let schema = Arc::new(Schema::new(vec![Field::new_list(
4236            "list",
4237            Field::new_list(
4238                Field::LIST_FIELD_DEFAULT_NAME,
4239                Field::new_list_field(ArrowDataType::Int32, true),
4240                true,
4241            ),
4242            true,
4243        )]));
4244        let mut buf = Vec::with_capacity(1024);
4245        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4246
4247        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4248
4249        for _ in 0..2048 {
4250            if rng.random_bool(0.2) {
4251                list_a_builder.append(false);
4252                continue;
4253            }
4254
4255            let list_a_len = rng.random_range(0..10);
4256            let list_b_builder = list_a_builder.values();
4257
4258            for _ in 0..list_a_len {
4259                if rng.random_bool(0.2) {
4260                    list_b_builder.append(false);
4261                    continue;
4262                }
4263
4264                let list_b_len = rng.random_range(0..10);
4265                let int_builder = list_b_builder.values();
4266                for _ in 0..list_b_len {
4267                    match rng.random_bool(0.2) {
4268                        true => int_builder.append_null(),
4269                        false => int_builder.append_value(rng.random()),
4270                    }
4271                }
4272                list_b_builder.append(true)
4273            }
4274            list_a_builder.append(true);
4275        }
4276
4277        let array = Arc::new(list_a_builder.finish());
4278        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4279
4280        writer.write(&batch).unwrap();
4281        let _metadata = writer.close().unwrap();
4282
4283        let buf = Bytes::from(buf);
4284
4285        let cases = [
4286            vec![
4287                RowSelector::skip(100),
4288                RowSelector::select(924),
4289                RowSelector::skip(100),
4290                RowSelector::select(924),
4291            ],
4292            vec![
4293                RowSelector::select(924),
4294                RowSelector::skip(100),
4295                RowSelector::select(924),
4296                RowSelector::skip(100),
4297            ],
4298            vec![
4299                RowSelector::skip(1023),
4300                RowSelector::select(1),
4301                RowSelector::skip(1023),
4302                RowSelector::select(1),
4303            ],
4304            vec![
4305                RowSelector::select(1),
4306                RowSelector::skip(1023),
4307                RowSelector::select(1),
4308                RowSelector::skip(1023),
4309            ],
4310        ];
4311
4312        for batch_size in [100, 1024, 2048] {
4313            for selection in &cases {
4314                let selection = RowSelection::from(selection.clone());
4315                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4316                    .unwrap()
4317                    .with_row_selection(selection.clone())
4318                    .with_batch_size(batch_size)
4319                    .build()
4320                    .unwrap();
4321
4322                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4323                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4324                assert_eq!(actual.num_rows(), selection.row_count());
4325
4326                let mut batch_offset = 0;
4327                let mut actual_offset = 0;
4328                for selector in selection.iter() {
4329                    if selector.skip {
4330                        batch_offset += selector.row_count;
4331                        continue;
4332                    }
4333
4334                    assert_eq!(
4335                        batch.slice(batch_offset, selector.row_count),
4336                        actual.slice(actual_offset, selector.row_count)
4337                    );
4338
4339                    batch_offset += selector.row_count;
4340                    actual_offset += selector.row_count;
4341                }
4342            }
4343        }
4344    }
4345
4346    #[test]
4347    fn test_read_old_nested_list() {
4348        use arrow::datatypes::DataType;
4349        use arrow::datatypes::ToByteSlice;
4350
4351        let testdata = arrow::util::test_util::parquet_test_data();
4352        // message my_record {
4353        //     REQUIRED group a (LIST) {
4354        //         REPEATED group array (LIST) {
4355        //             REPEATED INT32 array;
4356        //         }
4357        //     }
4358        // }
4359        // should be read as list<list<int32>>
4360        let path = format!("{testdata}/old_list_structure.parquet");
4361        let test_file = File::open(path).unwrap();
4362
4363        // create expected ListArray
4364        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4365
4366        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4367        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4368
4369        // Construct a list array from the above two
4370        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4371            "array",
4372            DataType::Int32,
4373            false,
4374        ))))
4375        .len(2)
4376        .add_buffer(a_value_offsets)
4377        .add_child_data(a_values.into_data())
4378        .build()
4379        .unwrap();
4380        let a = ListArray::from(a_list_data);
4381
4382        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4383        let mut reader = builder.build().unwrap();
4384        let out = reader.next().unwrap().unwrap();
4385        assert_eq!(out.num_rows(), 1);
4386        assert_eq!(out.num_columns(), 1);
4387        // grab first column
4388        let c0 = out.column(0);
4389        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4390        // get first row: [[1, 2], [3, 4]]
4391        let r0 = c0arr.value(0);
4392        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4393        assert_eq!(r0arr, &a);
4394    }
4395
4396    #[test]
4397    fn test_map_no_value() {
4398        // File schema:
4399        // message schema {
4400        //   required group my_map (MAP) {
4401        //     repeated group key_value {
4402        //       required int32 key;
4403        //       optional int32 value;
4404        //     }
4405        //   }
4406        //   required group my_map_no_v (MAP) {
4407        //     repeated group key_value {
4408        //       required int32 key;
4409        //     }
4410        //   }
4411        //   required group my_list (LIST) {
4412        //     repeated group list {
4413        //       required int32 element;
4414        //     }
4415        //   }
4416        // }
4417        let testdata = arrow::util::test_util::parquet_test_data();
4418        let path = format!("{testdata}/map_no_value.parquet");
4419        let file = File::open(path).unwrap();
4420
4421        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4422            .unwrap()
4423            .build()
4424            .unwrap();
4425        let out = reader.next().unwrap().unwrap();
4426        assert_eq!(out.num_rows(), 3);
4427        assert_eq!(out.num_columns(), 3);
4428        // my_map_no_v and my_list columns should now be equivalent
4429        let c0 = out.column(1).as_list::<i32>();
4430        let c1 = out.column(2).as_list::<i32>();
4431        assert_eq!(c0.len(), c1.len());
4432        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4433    }
4434}