parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::column::page::{PageIterator, PageReader};
34#[cfg(feature = "encryption")]
35use crate::encryption::decrypt::FileDecryptionProperties;
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
42
43mod filter;
44mod read_plan;
45mod selection;
46pub mod statistics;
47
48/// Builder for constructing Parquet readers that decode into [Apache Arrow]
49/// arrays.
50///
51/// Most users should use one of the following specializations:
52///
53/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
54/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
55///
56/// # Features
57/// * Projection pushdown: [`Self::with_projection`]
58/// * Cached metadata: [`ArrowReaderMetadata::load`]
59/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
60/// * Row group filtering: [`Self::with_row_groups`]
61/// * Range filtering: [`Self::with_row_selection`]
62/// * Row level filtering: [`Self::with_row_filter`]
63///
64/// # Implementing Predicate Pushdown
65///
66/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
67/// process, which is efficient and allows the most low level optimizations.
68///
69/// However, most Parquet based systems will apply filters at many steps prior
70/// to decoding such as pruning files, row groups and data pages. This crate
71/// provides the low level APIs needed to implement such filtering, but does not
72/// include any logic to actually evaluate predicates. For example:
73///
74/// * [`Self::with_row_groups`] for Row Group pruning
75/// * [`Self::with_row_selection`] for data page pruning
76/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
77///
78/// The rationale for this design is that implementing predicate pushdown is a
79/// complex topic and varies significantly from system to system. For example
80///
81/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
82/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
83/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
84/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
85///
86/// You can read more about this design in the [Querying Parquet with
87/// Millisecond Latency] Arrow blog post.
88///
89/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
90/// [Apache Arrow]: https://arrow.apache.org/
91/// [`StatisticsConverter`]: statistics::StatisticsConverter
92/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
93pub struct ArrowReaderBuilder<T> {
94    pub(crate) input: T,
95
96    pub(crate) metadata: Arc<ParquetMetaData>,
97
98    pub(crate) schema: SchemaRef,
99
100    pub(crate) fields: Option<Arc<ParquetField>>,
101
102    pub(crate) batch_size: usize,
103
104    pub(crate) row_groups: Option<Vec<usize>>,
105
106    pub(crate) projection: ProjectionMask,
107
108    pub(crate) filter: Option<RowFilter>,
109
110    pub(crate) selection: Option<RowSelection>,
111
112    pub(crate) limit: Option<usize>,
113
114    pub(crate) offset: Option<usize>,
115}
116
117impl<T: Debug> Debug for ArrowReaderBuilder<T> {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("ArrowReaderBuilder<T>")
120            .field("input", &self.input)
121            .field("metadata", &self.metadata)
122            .field("schema", &self.schema)
123            .field("fields", &self.fields)
124            .field("batch_size", &self.batch_size)
125            .field("row_groups", &self.row_groups)
126            .field("projection", &self.projection)
127            .field("filter", &self.filter)
128            .field("selection", &self.selection)
129            .field("limit", &self.limit)
130            .field("offset", &self.offset)
131            .finish()
132    }
133}
134
135impl<T> ArrowReaderBuilder<T> {
136    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
137        Self {
138            input,
139            metadata: metadata.metadata,
140            schema: metadata.schema,
141            fields: metadata.fields,
142            batch_size: 1024,
143            row_groups: None,
144            projection: ProjectionMask::all(),
145            filter: None,
146            selection: None,
147            limit: None,
148            offset: None,
149        }
150    }
151
152    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
153    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
154        &self.metadata
155    }
156
157    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
158    pub fn parquet_schema(&self) -> &SchemaDescriptor {
159        self.metadata.file_metadata().schema_descr()
160    }
161
162    /// Returns the arrow [`SchemaRef`] for this parquet file
163    pub fn schema(&self) -> &SchemaRef {
164        &self.schema
165    }
166
167    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
168    /// If the batch_size more than the file row count, use the file row count.
169    pub fn with_batch_size(self, batch_size: usize) -> Self {
170        // Try to avoid allocate large buffer
171        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
172        Self { batch_size, ..self }
173    }
174
175    /// Only read data from the provided row group indexes
176    ///
177    /// This is also called row group filtering
178    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
179        Self {
180            row_groups: Some(row_groups),
181            ..self
182        }
183    }
184
185    /// Only read data from the provided column indexes
186    pub fn with_projection(self, mask: ProjectionMask) -> Self {
187        Self {
188            projection: mask,
189            ..self
190        }
191    }
192
193    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
194    /// data into memory.
195    ///
196    /// This feature is used to restrict which rows are decoded within row
197    /// groups, skipping ranges of rows that are not needed. Such selections
198    /// could be determined by evaluating predicates against the parquet page
199    /// [`Index`] or some other external information available to a query
200    /// engine.
201    ///
202    /// # Notes
203    ///
204    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
205    /// applying the row selection, and therefore rows from skipped row groups
206    /// should not be included in the [`RowSelection`] (see example below)
207    ///
208    /// It is recommended to enable writing the page index if using this
209    /// functionality, to allow more efficient skipping over data pages. See
210    /// [`ArrowReaderOptions::with_page_index`].
211    ///
212    /// # Example
213    ///
214    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
215    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
216    /// row group 3:
217    ///
218    /// ```text
219    ///   Row Group 0, 1000 rows (selected)
220    ///   Row Group 1, 1000 rows (skipped)
221    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
222    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
223    /// ```
224    ///
225    /// You could pass the following [`RowSelection`]:
226    ///
227    /// ```text
228    ///  Select 1000    (scan all rows in row group 0)
229    ///  Skip 50        (skip the first 50 rows in row group 2)
230    ///  Select 50      (scan rows 50-100 in row group 2)
231    ///  Skip 900       (skip the remaining rows in row group 2)
232    ///  Skip 200       (skip the first 200 rows in row group 3)
233    ///  Select 100     (scan rows 200-300 in row group 3)
234    ///  Skip 700       (skip the remaining rows in row group 3)
235    /// ```
236    /// Note there is no entry for the (entirely) skipped row group 1.
237    ///
238    /// Note you can represent the same selection with fewer entries. Instead of
239    ///
240    /// ```text
241    ///  Skip 900       (skip the remaining rows in row group 2)
242    ///  Skip 200       (skip the first 200 rows in row group 3)
243    /// ```
244    ///
245    /// you could use
246    ///
247    /// ```text
248    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
249    /// ```
250    ///
251    /// [`Index`]: crate::file::page_index::index::Index
252    pub fn with_row_selection(self, selection: RowSelection) -> Self {
253        Self {
254            selection: Some(selection),
255            ..self
256        }
257    }
258
259    /// Provide a [`RowFilter`] to skip decoding rows
260    ///
261    /// Row filters are applied after row group selection and row selection
262    ///
263    /// It is recommended to enable reading the page index if using this functionality, to allow
264    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
265    pub fn with_row_filter(self, filter: RowFilter) -> Self {
266        Self {
267            filter: Some(filter),
268            ..self
269        }
270    }
271
272    /// Provide a limit to the number of rows to be read
273    ///
274    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
275    /// allowing it to limit the final set of rows decoded after any pushed down predicates
276    ///
277    /// It is recommended to enable reading the page index if using this functionality, to allow
278    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
279    pub fn with_limit(self, limit: usize) -> Self {
280        Self {
281            limit: Some(limit),
282            ..self
283        }
284    }
285
286    /// Provide an offset to skip over the given number of rows
287    ///
288    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
289    /// allowing it to skip rows after any pushed down predicates
290    ///
291    /// It is recommended to enable reading the page index if using this functionality, to allow
292    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
293    pub fn with_offset(self, offset: usize) -> Self {
294        Self {
295            offset: Some(offset),
296            ..self
297        }
298    }
299}
300
301/// Options that control how metadata is read for a parquet file
302///
303/// See [`ArrowReaderBuilder`] for how to configure how the column data
304/// is then read from the file, including projection and filter pushdown
305#[derive(Debug, Clone, Default)]
306pub struct ArrowReaderOptions {
307    /// Should the reader strip any user defined metadata from the Arrow schema
308    skip_arrow_metadata: bool,
309    /// If provided used as the schema hint when determining the Arrow schema,
310    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
311    ///
312    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
313    supplied_schema: Option<SchemaRef>,
314    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
315    pub(crate) page_index: bool,
316    /// If encryption is enabled, the file decryption properties can be provided
317    #[cfg(feature = "encryption")]
318    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
319}
320
321impl ArrowReaderOptions {
322    /// Create a new [`ArrowReaderOptions`] with the default settings
323    pub fn new() -> Self {
324        Self::default()
325    }
326
327    /// Skip decoding the embedded arrow metadata (defaults to `false`)
328    ///
329    /// Parquet files generated by some writers may contain embedded arrow
330    /// schema and metadata.
331    /// This may not be correct or compatible with your system,
332    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
333    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
334        Self {
335            skip_arrow_metadata,
336            ..self
337        }
338    }
339
340    /// Provide a schema hint to use when reading the Parquet file.
341    ///
342    /// If provided, this schema takes precedence over any arrow schema embedded
343    /// in the metadata (see the [`arrow`] documentation for more details).
344    ///
345    /// If the provided schema is not compatible with the data stored in the
346    /// parquet file schema, an error will be returned when constructing the
347    /// builder.
348    ///
349    /// This option is only required if you want to explicitly control the
350    /// conversion of Parquet types to Arrow types, such as casting a column to
351    /// a different type. For example, if you wanted to read an Int64 in
352    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
353    ///
354    /// [`arrow`]: crate::arrow
355    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
356    ///
357    /// # Notes
358    ///
359    /// The provided schema must have the same number of columns as the parquet schema and
360    /// the column names must be the same.
361    ///
362    /// # Example
363    /// ```
364    /// use std::io::Bytes;
365    /// use std::sync::Arc;
366    /// use tempfile::tempfile;
367    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
368    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
369    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
370    /// use parquet::arrow::ArrowWriter;
371    ///
372    /// // Write data - schema is inferred from the data to be Int32
373    /// let file = tempfile().unwrap();
374    /// let batch = RecordBatch::try_from_iter(vec![
375    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
376    /// ]).unwrap();
377    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
378    /// writer.write(&batch).unwrap();
379    /// writer.close().unwrap();
380    ///
381    /// // Read the file back.
382    /// // Supply a schema that interprets the Int32 column as a Timestamp.
383    /// let supplied_schema = Arc::new(Schema::new(vec![
384    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
385    /// ]));
386    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
387    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
388    ///     file.try_clone().unwrap(),
389    ///     options
390    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
391    ///
392    /// // Create the reader and read the data using the supplied schema.
393    /// let mut reader = builder.build().unwrap();
394    /// let _batch = reader.next().unwrap().unwrap();
395    /// ```
396    pub fn with_schema(self, schema: SchemaRef) -> Self {
397        Self {
398            supplied_schema: Some(schema),
399            skip_arrow_metadata: true,
400            ..self
401        }
402    }
403
404    /// Enable reading [`PageIndex`], if present (defaults to `false`)
405    ///
406    /// The `PageIndex` can be used to push down predicates to the parquet scan,
407    /// potentially eliminating unnecessary IO, by some query engines.
408    ///
409    /// If this is enabled, [`ParquetMetaData::column_index`] and
410    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
411    /// information is present in the file.
412    ///
413    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
414    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
415    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
416    pub fn with_page_index(self, page_index: bool) -> Self {
417        Self { page_index, ..self }
418    }
419
420    /// Provide the file decryption properties to use when reading encrypted parquet files.
421    ///
422    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
423    #[cfg(feature = "encryption")]
424    pub fn with_file_decryption_properties(
425        self,
426        file_decryption_properties: FileDecryptionProperties,
427    ) -> Self {
428        Self {
429            file_decryption_properties: Some(file_decryption_properties),
430            ..self
431        }
432    }
433
434    /// Retrieve the currently set page index behavior.
435    ///
436    /// This can be set via [`with_page_index`][Self::with_page_index].
437    pub fn page_index(&self) -> bool {
438        self.page_index
439    }
440
441    /// Retrieve the currently set file decryption properties.
442    ///
443    /// This can be set via
444    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
445    #[cfg(feature = "encryption")]
446    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
447        self.file_decryption_properties.as_ref()
448    }
449}
450
451/// The metadata necessary to construct a [`ArrowReaderBuilder`]
452///
453/// Note this structure is cheaply clone-able as it consists of several arcs.
454///
455/// This structure allows
456///
457/// 1. Loading metadata for a file once and then using that same metadata to
458///    construct multiple separate readers, for example, to distribute readers
459///    across multiple threads
460///
461/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
462///    from the file each time a reader is constructed.
463///
464/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
465#[derive(Debug, Clone)]
466pub struct ArrowReaderMetadata {
467    /// The Parquet Metadata, if known aprior
468    pub(crate) metadata: Arc<ParquetMetaData>,
469    /// The Arrow Schema
470    pub(crate) schema: SchemaRef,
471
472    pub(crate) fields: Option<Arc<ParquetField>>,
473}
474
475impl ArrowReaderMetadata {
476    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
477    ///
478    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
479    /// example of how this can be used
480    ///
481    /// # Notes
482    ///
483    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
484    /// `Self::metadata` is missing the page index, this function will attempt
485    /// to load the page index by making an object store request.
486    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
487        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
488        #[cfg(feature = "encryption")]
489        let metadata =
490            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
491        let metadata = metadata.parse_and_finish(reader)?;
492        Self::try_new(Arc::new(metadata), options)
493    }
494
495    /// Create a new [`ArrowReaderMetadata`]
496    ///
497    /// # Notes
498    ///
499    /// This function does not attempt to load the PageIndex if not present in the metadata.
500    /// See [`Self::load`] for more details.
501    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
502        match options.supplied_schema {
503            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
504            None => {
505                let kv_metadata = match options.skip_arrow_metadata {
506                    true => None,
507                    false => metadata.file_metadata().key_value_metadata(),
508                };
509
510                let (schema, fields) = parquet_to_arrow_schema_and_fields(
511                    metadata.file_metadata().schema_descr(),
512                    ProjectionMask::all(),
513                    kv_metadata,
514                )?;
515
516                Ok(Self {
517                    metadata,
518                    schema: Arc::new(schema),
519                    fields: fields.map(Arc::new),
520                })
521            }
522        }
523    }
524
525    fn with_supplied_schema(
526        metadata: Arc<ParquetMetaData>,
527        supplied_schema: SchemaRef,
528    ) -> Result<Self> {
529        let parquet_schema = metadata.file_metadata().schema_descr();
530        let field_levels = parquet_to_arrow_field_levels(
531            parquet_schema,
532            ProjectionMask::all(),
533            Some(supplied_schema.fields()),
534        )?;
535        let fields = field_levels.fields;
536        let inferred_len = fields.len();
537        let supplied_len = supplied_schema.fields().len();
538        // Ensure the supplied schema has the same number of columns as the parquet schema.
539        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
540        // different lengths, but we check here to be safe.
541        if inferred_len != supplied_len {
542            return Err(arrow_err!(format!(
543                "Incompatible supplied Arrow schema: expected {} columns received {}",
544                inferred_len, supplied_len
545            )));
546        }
547
548        let mut errors = Vec::new();
549
550        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
551
552        for (field1, field2) in field_iter {
553            if field1.data_type() != field2.data_type() {
554                errors.push(format!(
555                    "data type mismatch for field {}: requested {:?} but found {:?}",
556                    field1.name(),
557                    field1.data_type(),
558                    field2.data_type()
559                ));
560            }
561            if field1.is_nullable() != field2.is_nullable() {
562                errors.push(format!(
563                    "nullability mismatch for field {}: expected {:?} but found {:?}",
564                    field1.name(),
565                    field1.is_nullable(),
566                    field2.is_nullable()
567                ));
568            }
569            if field1.metadata() != field2.metadata() {
570                errors.push(format!(
571                    "metadata mismatch for field {}: expected {:?} but found {:?}",
572                    field1.name(),
573                    field1.metadata(),
574                    field2.metadata()
575                ));
576            }
577        }
578
579        if !errors.is_empty() {
580            let message = errors.join(", ");
581            return Err(ParquetError::ArrowError(format!(
582                "Incompatible supplied Arrow schema: {message}",
583            )));
584        }
585
586        Ok(Self {
587            metadata,
588            schema: supplied_schema,
589            fields: field_levels.levels.map(Arc::new),
590        })
591    }
592
593    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
594    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
595        &self.metadata
596    }
597
598    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
599    pub fn parquet_schema(&self) -> &SchemaDescriptor {
600        self.metadata.file_metadata().schema_descr()
601    }
602
603    /// Returns the arrow [`SchemaRef`] for this parquet file
604    pub fn schema(&self) -> &SchemaRef {
605        &self.schema
606    }
607}
608
609#[doc(hidden)]
610/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
611pub struct SyncReader<T: ChunkReader>(T);
612
613impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
614    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
615        f.debug_tuple("SyncReader").field(&self.0).finish()
616    }
617}
618
619/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
620///
621/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
622///
623/// See [`ArrowReaderBuilder`] for additional member functions
624pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
625
626impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
627    /// Create a new [`ParquetRecordBatchReaderBuilder`]
628    ///
629    /// ```
630    /// # use std::sync::Arc;
631    /// # use bytes::Bytes;
632    /// # use arrow_array::{Int32Array, RecordBatch};
633    /// # use arrow_schema::{DataType, Field, Schema};
634    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
635    /// # use parquet::arrow::ArrowWriter;
636    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
637    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
638    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
639    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
640    /// # writer.write(&batch).unwrap();
641    /// # writer.close().unwrap();
642    /// # let file = Bytes::from(file);
643    /// #
644    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
645    ///
646    /// // Inspect metadata
647    /// assert_eq!(builder.metadata().num_row_groups(), 1);
648    ///
649    /// // Construct reader
650    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
651    ///
652    /// // Read data
653    /// let _batch = reader.next().unwrap().unwrap();
654    /// ```
655    pub fn try_new(reader: T) -> Result<Self> {
656        Self::try_new_with_options(reader, Default::default())
657    }
658
659    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
660    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
661        let metadata = ArrowReaderMetadata::load(&reader, options)?;
662        Ok(Self::new_with_metadata(reader, metadata))
663    }
664
665    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
666    ///
667    /// This interface allows:
668    ///
669    /// 1. Loading metadata once and using it to create multiple builders with
670    ///    potentially different settings or run on different threads
671    ///
672    /// 2. Using a cached copy of the metadata rather than re-reading it from the
673    ///    file each time a reader is constructed.
674    ///
675    /// See the docs on [`ArrowReaderMetadata`] for more details
676    ///
677    /// # Example
678    /// ```
679    /// # use std::fs::metadata;
680    /// # use std::sync::Arc;
681    /// # use bytes::Bytes;
682    /// # use arrow_array::{Int32Array, RecordBatch};
683    /// # use arrow_schema::{DataType, Field, Schema};
684    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
685    /// # use parquet::arrow::ArrowWriter;
686    /// #
687    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
688    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
689    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
690    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
691    /// # writer.write(&batch).unwrap();
692    /// # writer.close().unwrap();
693    /// # let file = Bytes::from(file);
694    /// #
695    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
696    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
697    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
698    ///
699    /// // Should be able to read from both in parallel
700    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
701    /// ```
702    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
703        Self::new_builder(SyncReader(input), metadata)
704    }
705
706    /// Build a [`ParquetRecordBatchReader`]
707    ///
708    /// Note: this will eagerly evaluate any `RowFilter` before returning
709    pub fn build(self) -> Result<ParquetRecordBatchReader> {
710        // Try to avoid allocate large buffer
711        let batch_size = self
712            .batch_size
713            .min(self.metadata.file_metadata().num_rows() as usize);
714
715        let row_groups = self
716            .row_groups
717            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
718
719        let reader = ReaderRowGroups {
720            reader: Arc::new(self.input.0),
721            metadata: self.metadata,
722            row_groups,
723        };
724
725        let mut filter = self.filter;
726        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
727
728        // Update selection based on any filters
729        if let Some(filter) = filter.as_mut() {
730            for predicate in filter.predicates.iter_mut() {
731                // break early if we have ruled out all rows
732                if !plan_builder.selects_any() {
733                    break;
734                }
735
736                let array_reader = ArrayReaderBuilder::new(&reader)
737                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
738
739                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
740            }
741        }
742
743        let array_reader = ArrayReaderBuilder::new(&reader)
744            .build_array_reader(self.fields.as_deref(), &self.projection)?;
745
746        let read_plan = plan_builder
747            .limited(reader.num_rows())
748            .with_offset(self.offset)
749            .with_limit(self.limit)
750            .build_limited()
751            .build();
752
753        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
754    }
755}
756
757struct ReaderRowGroups<T: ChunkReader> {
758    reader: Arc<T>,
759
760    metadata: Arc<ParquetMetaData>,
761    /// Optional list of row group indices to scan
762    row_groups: Vec<usize>,
763}
764
765impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
766    fn num_rows(&self) -> usize {
767        let meta = self.metadata.row_groups();
768        self.row_groups
769            .iter()
770            .map(|x| meta[*x].num_rows() as usize)
771            .sum()
772    }
773
774    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
775        Ok(Box::new(ReaderPageIterator {
776            column_idx: i,
777            reader: self.reader.clone(),
778            metadata: self.metadata.clone(),
779            row_groups: self.row_groups.clone().into_iter(),
780        }))
781    }
782}
783
784struct ReaderPageIterator<T: ChunkReader> {
785    reader: Arc<T>,
786    column_idx: usize,
787    row_groups: std::vec::IntoIter<usize>,
788    metadata: Arc<ParquetMetaData>,
789}
790
791impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
792    /// Return the next SerializedPageReader
793    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
794        let rg = self.metadata.row_group(rg_idx);
795        let column_chunk_metadata = rg.column(self.column_idx);
796        let offset_index = self.metadata.offset_index();
797        // `offset_index` may not exist and `i[rg_idx]` will be empty.
798        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
799        let page_locations = offset_index
800            .filter(|i| !i[rg_idx].is_empty())
801            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
802        let total_rows = rg.num_rows() as usize;
803        let reader = self.reader.clone();
804
805        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
806            .add_crypto_context(
807                rg_idx,
808                self.column_idx,
809                self.metadata.as_ref(),
810                column_chunk_metadata,
811            )
812    }
813}
814
815impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
816    type Item = Result<Box<dyn PageReader>>;
817
818    fn next(&mut self) -> Option<Self::Item> {
819        let rg_idx = self.row_groups.next()?;
820        let page_reader = self
821            .next_page_reader(rg_idx)
822            .map(|page_reader| Box::new(page_reader) as _);
823        Some(page_reader)
824    }
825}
826
827impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
828
829/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
830/// read from a parquet data source
831pub struct ParquetRecordBatchReader {
832    array_reader: Box<dyn ArrayReader>,
833    schema: SchemaRef,
834    read_plan: ReadPlan,
835}
836
837impl Iterator for ParquetRecordBatchReader {
838    type Item = Result<RecordBatch, ArrowError>;
839
840    fn next(&mut self) -> Option<Self::Item> {
841        self.next_inner()
842            .map_err(|arrow_err| arrow_err.into())
843            .transpose()
844    }
845}
846
847impl ParquetRecordBatchReader {
848    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
849    /// has reached the end of the file.
850    ///
851    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
852    /// simplify error handling with `?`
853    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
854        let mut read_records = 0;
855        let batch_size = self.batch_size();
856        match self.read_plan.selection_mut() {
857            Some(selection) => {
858                while read_records < batch_size && !selection.is_empty() {
859                    let front = selection.pop_front().unwrap();
860                    if front.skip {
861                        let skipped = self.array_reader.skip_records(front.row_count)?;
862
863                        if skipped != front.row_count {
864                            return Err(general_err!(
865                                "failed to skip rows, expected {}, got {}",
866                                front.row_count,
867                                skipped
868                            ));
869                        }
870                        continue;
871                    }
872
873                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
874                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
875                    if front.row_count == 0 {
876                        continue;
877                    }
878
879                    // try to read record
880                    let need_read = batch_size - read_records;
881                    let to_read = match front.row_count.checked_sub(need_read) {
882                        Some(remaining) if remaining != 0 => {
883                            // if page row count less than batch_size we must set batch size to page row count.
884                            // add check avoid dead loop
885                            selection.push_front(RowSelector::select(remaining));
886                            need_read
887                        }
888                        _ => front.row_count,
889                    };
890                    match self.array_reader.read_records(to_read)? {
891                        0 => break,
892                        rec => read_records += rec,
893                    };
894                }
895            }
896            None => {
897                self.array_reader.read_records(batch_size)?;
898            }
899        };
900
901        let array = self.array_reader.consume_batch()?;
902        let struct_array = array.as_struct_opt().ok_or_else(|| {
903            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
904        })?;
905
906        Ok(if struct_array.len() > 0 {
907            Some(RecordBatch::from(struct_array))
908        } else {
909            None
910        })
911    }
912}
913
914impl RecordBatchReader for ParquetRecordBatchReader {
915    /// Returns the projected [`SchemaRef`] for reading the parquet file.
916    ///
917    /// Note that the schema metadata will be stripped here. See
918    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
919    fn schema(&self) -> SchemaRef {
920        self.schema.clone()
921    }
922}
923
924impl ParquetRecordBatchReader {
925    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
926    ///
927    /// See [`ParquetRecordBatchReaderBuilder`] for more options
928    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
929        ParquetRecordBatchReaderBuilder::try_new(reader)?
930            .with_batch_size(batch_size)
931            .build()
932    }
933
934    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
935    ///
936    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
937    /// higher-level interface for reading parquet data from a file
938    pub fn try_new_with_row_groups(
939        levels: &FieldLevels,
940        row_groups: &dyn RowGroups,
941        batch_size: usize,
942        selection: Option<RowSelection>,
943    ) -> Result<Self> {
944        let array_reader = ArrayReaderBuilder::new(row_groups)
945            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
946
947        let read_plan = ReadPlanBuilder::new(batch_size)
948            .with_selection(selection)
949            .build();
950
951        Ok(Self {
952            array_reader,
953            schema: Arc::new(Schema::new(levels.fields.clone())),
954            read_plan,
955        })
956    }
957
958    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
959    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
960    /// all rows will be returned
961    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
962        let schema = match array_reader.get_data_type() {
963            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
964            _ => unreachable!("Struct array reader's data type is not struct!"),
965        };
966
967        Self {
968            array_reader,
969            schema: Arc::new(schema),
970            read_plan,
971        }
972    }
973
974    #[inline(always)]
975    pub(crate) fn batch_size(&self) -> usize {
976        self.read_plan.batch_size()
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use std::cmp::min;
983    use std::collections::{HashMap, VecDeque};
984    use std::fmt::Formatter;
985    use std::fs::File;
986    use std::io::Seek;
987    use std::path::PathBuf;
988    use std::sync::Arc;
989
990    use arrow_array::builder::*;
991    use arrow_array::cast::AsArray;
992    use arrow_array::types::{
993        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
994        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
995    };
996    use arrow_array::*;
997    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
998    use arrow_data::{ArrayData, ArrayDataBuilder};
999    use arrow_schema::{
1000        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1001    };
1002    use arrow_select::concat::concat_batches;
1003    use bytes::Bytes;
1004    use half::f16;
1005    use num::PrimInt;
1006    use rand::{rng, Rng, RngCore};
1007    use tempfile::tempfile;
1008
1009    use crate::arrow::arrow_reader::{
1010        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1011        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1012    };
1013    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1014    use crate::arrow::{ArrowWriter, ProjectionMask};
1015    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1016    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1017    use crate::data_type::{
1018        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1019        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1020    };
1021    use crate::errors::Result;
1022    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1023    use crate::file::writer::SerializedFileWriter;
1024    use crate::schema::parser::parse_message_type;
1025    use crate::schema::types::{Type, TypePtr};
1026    use crate::util::test_common::rand_gen::RandGen;
1027
1028    #[test]
1029    fn test_arrow_reader_all_columns() {
1030        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1031
1032        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1033        let original_schema = Arc::clone(builder.schema());
1034        let reader = builder.build().unwrap();
1035
1036        // Verify that the schema was correctly parsed
1037        assert_eq!(original_schema.fields(), reader.schema().fields());
1038    }
1039
1040    #[test]
1041    fn test_arrow_reader_single_column() {
1042        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1043
1044        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045        let original_schema = Arc::clone(builder.schema());
1046
1047        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1048        let reader = builder.with_projection(mask).build().unwrap();
1049
1050        // Verify that the schema was correctly parsed
1051        assert_eq!(1, reader.schema().fields().len());
1052        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1053    }
1054
1055    #[test]
1056    fn test_arrow_reader_single_column_by_name() {
1057        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1058
1059        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1060        let original_schema = Arc::clone(builder.schema());
1061
1062        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1063        let reader = builder.with_projection(mask).build().unwrap();
1064
1065        // Verify that the schema was correctly parsed
1066        assert_eq!(1, reader.schema().fields().len());
1067        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1068    }
1069
1070    #[test]
1071    fn test_null_column_reader_test() {
1072        let mut file = tempfile::tempfile().unwrap();
1073
1074        let schema = "
1075            message message {
1076                OPTIONAL INT32 int32;
1077            }
1078        ";
1079        let schema = Arc::new(parse_message_type(schema).unwrap());
1080
1081        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1082        generate_single_column_file_with_data::<Int32Type>(
1083            &[vec![], vec![]],
1084            Some(&def_levels),
1085            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1086            schema,
1087            Some(Field::new("int32", ArrowDataType::Null, true)),
1088            &Default::default(),
1089        )
1090        .unwrap();
1091
1092        file.rewind().unwrap();
1093
1094        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1095        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1096
1097        assert_eq!(batches.len(), 4);
1098        for batch in &batches[0..3] {
1099            assert_eq!(batch.num_rows(), 2);
1100            assert_eq!(batch.num_columns(), 1);
1101            assert_eq!(batch.column(0).null_count(), 2);
1102        }
1103
1104        assert_eq!(batches[3].num_rows(), 1);
1105        assert_eq!(batches[3].num_columns(), 1);
1106        assert_eq!(batches[3].column(0).null_count(), 1);
1107    }
1108
1109    #[test]
1110    fn test_primitive_single_column_reader_test() {
1111        run_single_column_reader_tests::<BoolType, _, BoolType>(
1112            2,
1113            ConvertedType::NONE,
1114            None,
1115            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1116            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1117        );
1118        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1119            2,
1120            ConvertedType::NONE,
1121            None,
1122            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1123            &[
1124                Encoding::PLAIN,
1125                Encoding::RLE_DICTIONARY,
1126                Encoding::DELTA_BINARY_PACKED,
1127                Encoding::BYTE_STREAM_SPLIT,
1128            ],
1129        );
1130        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1131            2,
1132            ConvertedType::NONE,
1133            None,
1134            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1135            &[
1136                Encoding::PLAIN,
1137                Encoding::RLE_DICTIONARY,
1138                Encoding::DELTA_BINARY_PACKED,
1139                Encoding::BYTE_STREAM_SPLIT,
1140            ],
1141        );
1142        run_single_column_reader_tests::<FloatType, _, FloatType>(
1143            2,
1144            ConvertedType::NONE,
1145            None,
1146            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1147            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1148        );
1149    }
1150
1151    #[test]
1152    fn test_unsigned_primitive_single_column_reader_test() {
1153        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1154            2,
1155            ConvertedType::UINT_32,
1156            Some(ArrowDataType::UInt32),
1157            |vals| {
1158                Arc::new(UInt32Array::from_iter(
1159                    vals.iter().map(|x| x.map(|x| x as u32)),
1160                ))
1161            },
1162            &[
1163                Encoding::PLAIN,
1164                Encoding::RLE_DICTIONARY,
1165                Encoding::DELTA_BINARY_PACKED,
1166            ],
1167        );
1168        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1169            2,
1170            ConvertedType::UINT_64,
1171            Some(ArrowDataType::UInt64),
1172            |vals| {
1173                Arc::new(UInt64Array::from_iter(
1174                    vals.iter().map(|x| x.map(|x| x as u64)),
1175                ))
1176            },
1177            &[
1178                Encoding::PLAIN,
1179                Encoding::RLE_DICTIONARY,
1180                Encoding::DELTA_BINARY_PACKED,
1181            ],
1182        );
1183    }
1184
1185    #[test]
1186    fn test_unsigned_roundtrip() {
1187        let schema = Arc::new(Schema::new(vec![
1188            Field::new("uint32", ArrowDataType::UInt32, true),
1189            Field::new("uint64", ArrowDataType::UInt64, true),
1190        ]));
1191
1192        let mut buf = Vec::with_capacity(1024);
1193        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1194
1195        let original = RecordBatch::try_new(
1196            schema,
1197            vec![
1198                Arc::new(UInt32Array::from_iter_values([
1199                    0,
1200                    i32::MAX as u32,
1201                    u32::MAX,
1202                ])),
1203                Arc::new(UInt64Array::from_iter_values([
1204                    0,
1205                    i64::MAX as u64,
1206                    u64::MAX,
1207                ])),
1208            ],
1209        )
1210        .unwrap();
1211
1212        writer.write(&original).unwrap();
1213        writer.close().unwrap();
1214
1215        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1216        let ret = reader.next().unwrap().unwrap();
1217        assert_eq!(ret, original);
1218
1219        // Check they can be downcast to the correct type
1220        ret.column(0)
1221            .as_any()
1222            .downcast_ref::<UInt32Array>()
1223            .unwrap();
1224
1225        ret.column(1)
1226            .as_any()
1227            .downcast_ref::<UInt64Array>()
1228            .unwrap();
1229    }
1230
1231    #[test]
1232    fn test_float16_roundtrip() -> Result<()> {
1233        let schema = Arc::new(Schema::new(vec![
1234            Field::new("float16", ArrowDataType::Float16, false),
1235            Field::new("float16-nullable", ArrowDataType::Float16, true),
1236        ]));
1237
1238        let mut buf = Vec::with_capacity(1024);
1239        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1240
1241        let original = RecordBatch::try_new(
1242            schema,
1243            vec![
1244                Arc::new(Float16Array::from_iter_values([
1245                    f16::EPSILON,
1246                    f16::MIN,
1247                    f16::MAX,
1248                    f16::NAN,
1249                    f16::INFINITY,
1250                    f16::NEG_INFINITY,
1251                    f16::ONE,
1252                    f16::NEG_ONE,
1253                    f16::ZERO,
1254                    f16::NEG_ZERO,
1255                    f16::E,
1256                    f16::PI,
1257                    f16::FRAC_1_PI,
1258                ])),
1259                Arc::new(Float16Array::from(vec![
1260                    None,
1261                    None,
1262                    None,
1263                    Some(f16::NAN),
1264                    Some(f16::INFINITY),
1265                    Some(f16::NEG_INFINITY),
1266                    None,
1267                    None,
1268                    None,
1269                    None,
1270                    None,
1271                    None,
1272                    Some(f16::FRAC_1_PI),
1273                ])),
1274            ],
1275        )?;
1276
1277        writer.write(&original)?;
1278        writer.close()?;
1279
1280        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1281        let ret = reader.next().unwrap()?;
1282        assert_eq!(ret, original);
1283
1284        // Ensure can be downcast to the correct type
1285        ret.column(0).as_primitive::<Float16Type>();
1286        ret.column(1).as_primitive::<Float16Type>();
1287
1288        Ok(())
1289    }
1290
1291    #[test]
1292    fn test_time_utc_roundtrip() -> Result<()> {
1293        let schema = Arc::new(Schema::new(vec![
1294            Field::new(
1295                "time_millis",
1296                ArrowDataType::Time32(TimeUnit::Millisecond),
1297                true,
1298            )
1299            .with_metadata(HashMap::from_iter(vec![(
1300                "adjusted_to_utc".to_string(),
1301                "".to_string(),
1302            )])),
1303            Field::new(
1304                "time_micros",
1305                ArrowDataType::Time64(TimeUnit::Microsecond),
1306                true,
1307            )
1308            .with_metadata(HashMap::from_iter(vec![(
1309                "adjusted_to_utc".to_string(),
1310                "".to_string(),
1311            )])),
1312        ]));
1313
1314        let mut buf = Vec::with_capacity(1024);
1315        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1316
1317        let original = RecordBatch::try_new(
1318            schema,
1319            vec![
1320                Arc::new(Time32MillisecondArray::from(vec![
1321                    Some(-1),
1322                    Some(0),
1323                    Some(86_399_000),
1324                    Some(86_400_000),
1325                    Some(86_401_000),
1326                    None,
1327                ])),
1328                Arc::new(Time64MicrosecondArray::from(vec![
1329                    Some(-1),
1330                    Some(0),
1331                    Some(86_399 * 1_000_000),
1332                    Some(86_400 * 1_000_000),
1333                    Some(86_401 * 1_000_000),
1334                    None,
1335                ])),
1336            ],
1337        )?;
1338
1339        writer.write(&original)?;
1340        writer.close()?;
1341
1342        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1343        let ret = reader.next().unwrap()?;
1344        assert_eq!(ret, original);
1345
1346        // Ensure can be downcast to the correct type
1347        ret.column(0).as_primitive::<Time32MillisecondType>();
1348        ret.column(1).as_primitive::<Time64MicrosecondType>();
1349
1350        Ok(())
1351    }
1352
1353    #[test]
1354    fn test_date32_roundtrip() -> Result<()> {
1355        use arrow_array::Date32Array;
1356
1357        let schema = Arc::new(Schema::new(vec![Field::new(
1358            "date32",
1359            ArrowDataType::Date32,
1360            false,
1361        )]));
1362
1363        let mut buf = Vec::with_capacity(1024);
1364
1365        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1366
1367        let original = RecordBatch::try_new(
1368            schema,
1369            vec![Arc::new(Date32Array::from(vec![
1370                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1371            ]))],
1372        )?;
1373
1374        writer.write(&original)?;
1375        writer.close()?;
1376
1377        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1378        let ret = reader.next().unwrap()?;
1379        assert_eq!(ret, original);
1380
1381        // Ensure can be downcast to the correct type
1382        ret.column(0).as_primitive::<Date32Type>();
1383
1384        Ok(())
1385    }
1386
1387    #[test]
1388    fn test_date64_roundtrip() -> Result<()> {
1389        use arrow_array::Date64Array;
1390
1391        let schema = Arc::new(Schema::new(vec![
1392            Field::new("small-date64", ArrowDataType::Date64, false),
1393            Field::new("big-date64", ArrowDataType::Date64, false),
1394            Field::new("invalid-date64", ArrowDataType::Date64, false),
1395        ]));
1396
1397        let mut default_buf = Vec::with_capacity(1024);
1398        let mut coerce_buf = Vec::with_capacity(1024);
1399
1400        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1401
1402        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1403        let mut coerce_writer =
1404            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1405
1406        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1407
1408        let original = RecordBatch::try_new(
1409            schema,
1410            vec![
1411                // small-date64
1412                Arc::new(Date64Array::from(vec![
1413                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1414                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1415                    0,
1416                    1_000 * NUM_MILLISECONDS_IN_DAY,
1417                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1418                ])),
1419                // big-date64
1420                Arc::new(Date64Array::from(vec![
1421                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1422                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1423                    0,
1424                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1425                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1426                ])),
1427                // invalid-date64
1428                Arc::new(Date64Array::from(vec![
1429                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1430                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1431                    1,
1432                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1433                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1434                ])),
1435            ],
1436        )?;
1437
1438        default_writer.write(&original)?;
1439        coerce_writer.write(&original)?;
1440
1441        default_writer.close()?;
1442        coerce_writer.close()?;
1443
1444        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1445        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1446
1447        let default_ret = default_reader.next().unwrap()?;
1448        let coerce_ret = coerce_reader.next().unwrap()?;
1449
1450        // Roundtrip should be successful when default writer used
1451        assert_eq!(default_ret, original);
1452
1453        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1454        assert_eq!(coerce_ret.column(0), original.column(0));
1455        assert_ne!(coerce_ret.column(1), original.column(1));
1456        assert_ne!(coerce_ret.column(2), original.column(2));
1457
1458        // Ensure both can be downcast to the correct type
1459        default_ret.column(0).as_primitive::<Date64Type>();
1460        coerce_ret.column(0).as_primitive::<Date64Type>();
1461
1462        Ok(())
1463    }
1464    struct RandFixedLenGen {}
1465
1466    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1467        fn gen(len: i32) -> FixedLenByteArray {
1468            let mut v = vec![0u8; len as usize];
1469            rng().fill_bytes(&mut v);
1470            ByteArray::from(v).into()
1471        }
1472    }
1473
1474    #[test]
1475    fn test_fixed_length_binary_column_reader() {
1476        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1477            20,
1478            ConvertedType::NONE,
1479            None,
1480            |vals| {
1481                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1482                for val in vals {
1483                    match val {
1484                        Some(b) => builder.append_value(b).unwrap(),
1485                        None => builder.append_null(),
1486                    }
1487                }
1488                Arc::new(builder.finish())
1489            },
1490            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1491        );
1492    }
1493
1494    #[test]
1495    fn test_interval_day_time_column_reader() {
1496        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1497            12,
1498            ConvertedType::INTERVAL,
1499            None,
1500            |vals| {
1501                Arc::new(
1502                    vals.iter()
1503                        .map(|x| {
1504                            x.as_ref().map(|b| IntervalDayTime {
1505                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1506                                milliseconds: i32::from_le_bytes(
1507                                    b.as_ref()[8..12].try_into().unwrap(),
1508                                ),
1509                            })
1510                        })
1511                        .collect::<IntervalDayTimeArray>(),
1512                )
1513            },
1514            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1515        );
1516    }
1517
1518    #[test]
1519    fn test_int96_single_column_reader_test() {
1520        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1521
1522        type TypeHintAndConversionFunction =
1523            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1524
1525        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1526            // Test without a specified ArrowType hint.
1527            (None, |vals: &[Option<Int96>]| {
1528                Arc::new(TimestampNanosecondArray::from_iter(
1529                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1530                )) as ArrayRef
1531            }),
1532            // Test other TimeUnits as ArrowType hints.
1533            (
1534                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1535                |vals: &[Option<Int96>]| {
1536                    Arc::new(TimestampSecondArray::from_iter(
1537                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1538                    )) as ArrayRef
1539                },
1540            ),
1541            (
1542                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1543                |vals: &[Option<Int96>]| {
1544                    Arc::new(TimestampMillisecondArray::from_iter(
1545                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1546                    )) as ArrayRef
1547                },
1548            ),
1549            (
1550                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1551                |vals: &[Option<Int96>]| {
1552                    Arc::new(TimestampMicrosecondArray::from_iter(
1553                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1554                    )) as ArrayRef
1555                },
1556            ),
1557            (
1558                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1559                |vals: &[Option<Int96>]| {
1560                    Arc::new(TimestampNanosecondArray::from_iter(
1561                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1562                    )) as ArrayRef
1563                },
1564            ),
1565            // Test another timezone with TimeUnit as ArrowType hints.
1566            (
1567                Some(ArrowDataType::Timestamp(
1568                    TimeUnit::Second,
1569                    Some(Arc::from("-05:00")),
1570                )),
1571                |vals: &[Option<Int96>]| {
1572                    Arc::new(
1573                        TimestampSecondArray::from_iter(
1574                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1575                        )
1576                        .with_timezone("-05:00"),
1577                    ) as ArrayRef
1578                },
1579            ),
1580        ];
1581
1582        resolutions.iter().for_each(|(arrow_type, converter)| {
1583            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1584                2,
1585                ConvertedType::NONE,
1586                arrow_type.clone(),
1587                converter,
1588                encodings,
1589            );
1590        })
1591    }
1592
1593    #[test]
1594    fn test_int96_from_spark_file_with_provided_schema() {
1595        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1596        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1597        // microsecond resolution for the Parquet reader to interpret these values correctly.
1598        use arrow_schema::DataType::Timestamp;
1599        let test_data = arrow::util::test_util::parquet_test_data();
1600        let path = format!("{test_data}/int96_from_spark.parquet");
1601        let file = File::open(path).unwrap();
1602
1603        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1604            "a",
1605            Timestamp(TimeUnit::Microsecond, None),
1606            true,
1607        )]));
1608        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1609
1610        let mut record_reader =
1611            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1612                .unwrap()
1613                .build()
1614                .unwrap();
1615
1616        let batch = record_reader.next().unwrap().unwrap();
1617        assert_eq!(batch.num_columns(), 1);
1618        let column = batch.column(0);
1619        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1620
1621        let expected = Arc::new(Int64Array::from(vec![
1622            Some(1704141296123456),
1623            Some(1704070800000000),
1624            Some(253402225200000000),
1625            Some(1735599600000000),
1626            None,
1627            Some(9089380393200000000),
1628        ]));
1629
1630        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1631        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1632        // anyway, so this should be a zero-copy non-modifying cast.
1633
1634        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1635        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1636
1637        assert_eq!(casted_timestamps.len(), expected.len());
1638
1639        casted_timestamps
1640            .iter()
1641            .zip(expected.iter())
1642            .for_each(|(lhs, rhs)| {
1643                assert_eq!(lhs, rhs);
1644            });
1645    }
1646
1647    #[test]
1648    fn test_int96_from_spark_file_without_provided_schema() {
1649        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1650        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1651        // values when read as nanosecond resolution overflow and result in garbage values.
1652        use arrow_schema::DataType::Timestamp;
1653        let test_data = arrow::util::test_util::parquet_test_data();
1654        let path = format!("{test_data}/int96_from_spark.parquet");
1655        let file = File::open(path).unwrap();
1656
1657        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1658            .unwrap()
1659            .build()
1660            .unwrap();
1661
1662        let batch = record_reader.next().unwrap().unwrap();
1663        assert_eq!(batch.num_columns(), 1);
1664        let column = batch.column(0);
1665        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1666
1667        let expected = Arc::new(Int64Array::from(vec![
1668            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1669            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1670            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1671            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1672            None,
1673            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1674        ]));
1675
1676        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1677        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1678        // anyway, so this should be a zero-copy non-modifying cast.
1679
1680        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1681        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1682
1683        assert_eq!(casted_timestamps.len(), expected.len());
1684
1685        casted_timestamps
1686            .iter()
1687            .zip(expected.iter())
1688            .for_each(|(lhs, rhs)| {
1689                assert_eq!(lhs, rhs);
1690            });
1691    }
1692
1693    struct RandUtf8Gen {}
1694
1695    impl RandGen<ByteArrayType> for RandUtf8Gen {
1696        fn gen(len: i32) -> ByteArray {
1697            Int32Type::gen(len).to_string().as_str().into()
1698        }
1699    }
1700
1701    #[test]
1702    fn test_utf8_single_column_reader_test() {
1703        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1704            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1705                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1706            })))
1707        }
1708
1709        let encodings = &[
1710            Encoding::PLAIN,
1711            Encoding::RLE_DICTIONARY,
1712            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1713            Encoding::DELTA_BYTE_ARRAY,
1714        ];
1715
1716        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1717            2,
1718            ConvertedType::NONE,
1719            None,
1720            |vals| {
1721                Arc::new(BinaryArray::from_iter(
1722                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1723                ))
1724            },
1725            encodings,
1726        );
1727
1728        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1729            2,
1730            ConvertedType::UTF8,
1731            None,
1732            string_converter::<i32>,
1733            encodings,
1734        );
1735
1736        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1737            2,
1738            ConvertedType::UTF8,
1739            Some(ArrowDataType::Utf8),
1740            string_converter::<i32>,
1741            encodings,
1742        );
1743
1744        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1745            2,
1746            ConvertedType::UTF8,
1747            Some(ArrowDataType::LargeUtf8),
1748            string_converter::<i64>,
1749            encodings,
1750        );
1751
1752        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1753        for key in &small_key_types {
1754            for encoding in encodings {
1755                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1756                opts.encoding = *encoding;
1757
1758                let data_type =
1759                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1760
1761                // Cannot run full test suite as keys overflow, run small test instead
1762                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1763                    opts,
1764                    2,
1765                    ConvertedType::UTF8,
1766                    Some(data_type.clone()),
1767                    move |vals| {
1768                        let vals = string_converter::<i32>(vals);
1769                        arrow::compute::cast(&vals, &data_type).unwrap()
1770                    },
1771                );
1772            }
1773        }
1774
1775        let key_types = [
1776            ArrowDataType::Int16,
1777            ArrowDataType::UInt16,
1778            ArrowDataType::Int32,
1779            ArrowDataType::UInt32,
1780            ArrowDataType::Int64,
1781            ArrowDataType::UInt64,
1782        ];
1783
1784        for key in &key_types {
1785            let data_type =
1786                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1787
1788            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1789                2,
1790                ConvertedType::UTF8,
1791                Some(data_type.clone()),
1792                move |vals| {
1793                    let vals = string_converter::<i32>(vals);
1794                    arrow::compute::cast(&vals, &data_type).unwrap()
1795                },
1796                encodings,
1797            );
1798
1799            // https://github.com/apache/arrow-rs/issues/1179
1800            // let data_type = ArrowDataType::Dictionary(
1801            //     Box::new(key.clone()),
1802            //     Box::new(ArrowDataType::LargeUtf8),
1803            // );
1804            //
1805            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1806            //     2,
1807            //     ConvertedType::UTF8,
1808            //     Some(data_type.clone()),
1809            //     move |vals| {
1810            //         let vals = string_converter::<i64>(vals);
1811            //         arrow::compute::cast(&vals, &data_type).unwrap()
1812            //     },
1813            //     encodings,
1814            // );
1815        }
1816    }
1817
1818    #[test]
1819    fn test_decimal_nullable_struct() {
1820        let decimals = Decimal256Array::from_iter_values(
1821            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1822        );
1823
1824        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1825            "decimals",
1826            decimals.data_type().clone(),
1827            false,
1828        )])))
1829        .len(8)
1830        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1831        .child_data(vec![decimals.into_data()])
1832        .build()
1833        .unwrap();
1834
1835        let written =
1836            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1837                .unwrap();
1838
1839        let mut buffer = Vec::with_capacity(1024);
1840        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1841        writer.write(&written).unwrap();
1842        writer.close().unwrap();
1843
1844        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1845            .unwrap()
1846            .collect::<Result<Vec<_>, _>>()
1847            .unwrap();
1848
1849        assert_eq!(&written.slice(0, 3), &read[0]);
1850        assert_eq!(&written.slice(3, 3), &read[1]);
1851        assert_eq!(&written.slice(6, 2), &read[2]);
1852    }
1853
1854    #[test]
1855    fn test_int32_nullable_struct() {
1856        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1857        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1858            "int32",
1859            int32.data_type().clone(),
1860            false,
1861        )])))
1862        .len(8)
1863        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1864        .child_data(vec![int32.into_data()])
1865        .build()
1866        .unwrap();
1867
1868        let written =
1869            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1870                .unwrap();
1871
1872        let mut buffer = Vec::with_capacity(1024);
1873        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1874        writer.write(&written).unwrap();
1875        writer.close().unwrap();
1876
1877        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1878            .unwrap()
1879            .collect::<Result<Vec<_>, _>>()
1880            .unwrap();
1881
1882        assert_eq!(&written.slice(0, 3), &read[0]);
1883        assert_eq!(&written.slice(3, 3), &read[1]);
1884        assert_eq!(&written.slice(6, 2), &read[2]);
1885    }
1886
1887    #[test]
1888    fn test_decimal_list() {
1889        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1890
1891        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1892        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1893            decimals.data_type().clone(),
1894            false,
1895        ))))
1896        .len(7)
1897        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1898        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1899        .child_data(vec![decimals.into_data()])
1900        .build()
1901        .unwrap();
1902
1903        let written =
1904            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1905                .unwrap();
1906
1907        let mut buffer = Vec::with_capacity(1024);
1908        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1909        writer.write(&written).unwrap();
1910        writer.close().unwrap();
1911
1912        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1913            .unwrap()
1914            .collect::<Result<Vec<_>, _>>()
1915            .unwrap();
1916
1917        assert_eq!(&written.slice(0, 3), &read[0]);
1918        assert_eq!(&written.slice(3, 3), &read[1]);
1919        assert_eq!(&written.slice(6, 1), &read[2]);
1920    }
1921
1922    #[test]
1923    fn test_read_decimal_file() {
1924        use arrow_array::Decimal128Array;
1925        let testdata = arrow::util::test_util::parquet_test_data();
1926        let file_variants = vec![
1927            ("byte_array", 4),
1928            ("fixed_length", 25),
1929            ("int32", 4),
1930            ("int64", 10),
1931        ];
1932        for (prefix, target_precision) in file_variants {
1933            let path = format!("{testdata}/{prefix}_decimal.parquet");
1934            let file = File::open(path).unwrap();
1935            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1936
1937            let batch = record_reader.next().unwrap().unwrap();
1938            assert_eq!(batch.num_rows(), 24);
1939            let col = batch
1940                .column(0)
1941                .as_any()
1942                .downcast_ref::<Decimal128Array>()
1943                .unwrap();
1944
1945            let expected = 1..25;
1946
1947            assert_eq!(col.precision(), target_precision);
1948            assert_eq!(col.scale(), 2);
1949
1950            for (i, v) in expected.enumerate() {
1951                assert_eq!(col.value(i), v * 100_i128);
1952            }
1953        }
1954    }
1955
1956    #[test]
1957    fn test_read_float16_nonzeros_file() {
1958        use arrow_array::Float16Array;
1959        let testdata = arrow::util::test_util::parquet_test_data();
1960        // see https://github.com/apache/parquet-testing/pull/40
1961        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1962        let file = File::open(path).unwrap();
1963        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1964
1965        let batch = record_reader.next().unwrap().unwrap();
1966        assert_eq!(batch.num_rows(), 8);
1967        let col = batch
1968            .column(0)
1969            .as_any()
1970            .downcast_ref::<Float16Array>()
1971            .unwrap();
1972
1973        let f16_two = f16::ONE + f16::ONE;
1974
1975        assert_eq!(col.null_count(), 1);
1976        assert!(col.is_null(0));
1977        assert_eq!(col.value(1), f16::ONE);
1978        assert_eq!(col.value(2), -f16_two);
1979        assert!(col.value(3).is_nan());
1980        assert_eq!(col.value(4), f16::ZERO);
1981        assert!(col.value(4).is_sign_positive());
1982        assert_eq!(col.value(5), f16::NEG_ONE);
1983        assert_eq!(col.value(6), f16::NEG_ZERO);
1984        assert!(col.value(6).is_sign_negative());
1985        assert_eq!(col.value(7), f16_two);
1986    }
1987
1988    #[test]
1989    fn test_read_float16_zeros_file() {
1990        use arrow_array::Float16Array;
1991        let testdata = arrow::util::test_util::parquet_test_data();
1992        // see https://github.com/apache/parquet-testing/pull/40
1993        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1994        let file = File::open(path).unwrap();
1995        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1996
1997        let batch = record_reader.next().unwrap().unwrap();
1998        assert_eq!(batch.num_rows(), 3);
1999        let col = batch
2000            .column(0)
2001            .as_any()
2002            .downcast_ref::<Float16Array>()
2003            .unwrap();
2004
2005        assert_eq!(col.null_count(), 1);
2006        assert!(col.is_null(0));
2007        assert_eq!(col.value(1), f16::ZERO);
2008        assert!(col.value(1).is_sign_positive());
2009        assert!(col.value(2).is_nan());
2010    }
2011
2012    #[test]
2013    fn test_read_float32_float64_byte_stream_split() {
2014        let path = format!(
2015            "{}/byte_stream_split.zstd.parquet",
2016            arrow::util::test_util::parquet_test_data(),
2017        );
2018        let file = File::open(path).unwrap();
2019        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2020
2021        let mut row_count = 0;
2022        for batch in record_reader {
2023            let batch = batch.unwrap();
2024            row_count += batch.num_rows();
2025            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2026            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2027
2028            // This file contains floats from a standard normal distribution
2029            for &x in f32_col.values() {
2030                assert!(x > -10.0);
2031                assert!(x < 10.0);
2032            }
2033            for &x in f64_col.values() {
2034                assert!(x > -10.0);
2035                assert!(x < 10.0);
2036            }
2037        }
2038        assert_eq!(row_count, 300);
2039    }
2040
2041    #[test]
2042    fn test_read_extended_byte_stream_split() {
2043        let path = format!(
2044            "{}/byte_stream_split_extended.gzip.parquet",
2045            arrow::util::test_util::parquet_test_data(),
2046        );
2047        let file = File::open(path).unwrap();
2048        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2049
2050        let mut row_count = 0;
2051        for batch in record_reader {
2052            let batch = batch.unwrap();
2053            row_count += batch.num_rows();
2054
2055            // 0,1 are f16
2056            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2057            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2058            assert_eq!(f16_col.len(), f16_bss.len());
2059            f16_col
2060                .iter()
2061                .zip(f16_bss.iter())
2062                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2063
2064            // 2,3 are f32
2065            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2066            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2067            assert_eq!(f32_col.len(), f32_bss.len());
2068            f32_col
2069                .iter()
2070                .zip(f32_bss.iter())
2071                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2072
2073            // 4,5 are f64
2074            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2075            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2076            assert_eq!(f64_col.len(), f64_bss.len());
2077            f64_col
2078                .iter()
2079                .zip(f64_bss.iter())
2080                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2081
2082            // 6,7 are i32
2083            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2084            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2085            assert_eq!(i32_col.len(), i32_bss.len());
2086            i32_col
2087                .iter()
2088                .zip(i32_bss.iter())
2089                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2090
2091            // 8,9 are i64
2092            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2093            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2094            assert_eq!(i64_col.len(), i64_bss.len());
2095            i64_col
2096                .iter()
2097                .zip(i64_bss.iter())
2098                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2099
2100            // 10,11 are FLBA(5)
2101            let flba_col = batch.column(10).as_fixed_size_binary();
2102            let flba_bss = batch.column(11).as_fixed_size_binary();
2103            assert_eq!(flba_col.len(), flba_bss.len());
2104            flba_col
2105                .iter()
2106                .zip(flba_bss.iter())
2107                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2108
2109            // 12,13 are FLBA(4) (decimal(7,3))
2110            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2111            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2112            assert_eq!(dec_col.len(), dec_bss.len());
2113            dec_col
2114                .iter()
2115                .zip(dec_bss.iter())
2116                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2117        }
2118        assert_eq!(row_count, 200);
2119    }
2120
2121    #[test]
2122    fn test_read_incorrect_map_schema_file() {
2123        let testdata = arrow::util::test_util::parquet_test_data();
2124        // see https://github.com/apache/parquet-testing/pull/47
2125        let path = format!("{testdata}/incorrect_map_schema.parquet");
2126        let file = File::open(path).unwrap();
2127        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2128
2129        let batch = record_reader.next().unwrap().unwrap();
2130        assert_eq!(batch.num_rows(), 1);
2131
2132        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2133            "my_map",
2134            ArrowDataType::Map(
2135                Arc::new(Field::new(
2136                    "key_value",
2137                    ArrowDataType::Struct(Fields::from(vec![
2138                        Field::new("key", ArrowDataType::Utf8, false),
2139                        Field::new("value", ArrowDataType::Utf8, true),
2140                    ])),
2141                    false,
2142                )),
2143                false,
2144            ),
2145            true,
2146        )]));
2147        assert_eq!(batch.schema().as_ref(), &expected_schema);
2148
2149        assert_eq!(batch.num_rows(), 1);
2150        assert_eq!(batch.column(0).null_count(), 0);
2151        assert_eq!(
2152            batch.column(0).as_map().keys().as_ref(),
2153            &StringArray::from(vec!["parent", "name"])
2154        );
2155        assert_eq!(
2156            batch.column(0).as_map().values().as_ref(),
2157            &StringArray::from(vec!["another", "report"])
2158        );
2159    }
2160
2161    #[test]
2162    fn test_read_dict_fixed_size_binary() {
2163        let schema = Arc::new(Schema::new(vec![Field::new(
2164            "a",
2165            ArrowDataType::Dictionary(
2166                Box::new(ArrowDataType::UInt8),
2167                Box::new(ArrowDataType::FixedSizeBinary(8)),
2168            ),
2169            true,
2170        )]));
2171        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2172        let values = FixedSizeBinaryArray::try_from_iter(
2173            vec![
2174                (0u8..8u8).collect::<Vec<u8>>(),
2175                (24u8..32u8).collect::<Vec<u8>>(),
2176            ]
2177            .into_iter(),
2178        )
2179        .unwrap();
2180        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2181        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2182
2183        let mut buffer = Vec::with_capacity(1024);
2184        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2185        writer.write(&batch).unwrap();
2186        writer.close().unwrap();
2187        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2188            .unwrap()
2189            .collect::<Result<Vec<_>, _>>()
2190            .unwrap();
2191
2192        assert_eq!(read.len(), 1);
2193        assert_eq!(&batch, &read[0])
2194    }
2195
2196    /// Parameters for single_column_reader_test
2197    #[derive(Clone)]
2198    struct TestOptions {
2199        /// Number of row group to write to parquet (row group size =
2200        /// num_row_groups / num_rows)
2201        num_row_groups: usize,
2202        /// Total number of rows per row group
2203        num_rows: usize,
2204        /// Size of batches to read back
2205        record_batch_size: usize,
2206        /// Percentage of nulls in column or None if required
2207        null_percent: Option<usize>,
2208        /// Set write batch size
2209        ///
2210        /// This is the number of rows that are written at once to a page and
2211        /// therefore acts as a bound on the page granularity of a row group
2212        write_batch_size: usize,
2213        /// Maximum size of page in bytes
2214        max_data_page_size: usize,
2215        /// Maximum size of dictionary page in bytes
2216        max_dict_page_size: usize,
2217        /// Writer version
2218        writer_version: WriterVersion,
2219        /// Enabled statistics
2220        enabled_statistics: EnabledStatistics,
2221        /// Encoding
2222        encoding: Encoding,
2223        /// row selections and total selected row count
2224        row_selections: Option<(RowSelection, usize)>,
2225        /// row filter
2226        row_filter: Option<Vec<bool>>,
2227        /// limit
2228        limit: Option<usize>,
2229        /// offset
2230        offset: Option<usize>,
2231    }
2232
2233    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2234    impl std::fmt::Debug for TestOptions {
2235        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2236            f.debug_struct("TestOptions")
2237                .field("num_row_groups", &self.num_row_groups)
2238                .field("num_rows", &self.num_rows)
2239                .field("record_batch_size", &self.record_batch_size)
2240                .field("null_percent", &self.null_percent)
2241                .field("write_batch_size", &self.write_batch_size)
2242                .field("max_data_page_size", &self.max_data_page_size)
2243                .field("max_dict_page_size", &self.max_dict_page_size)
2244                .field("writer_version", &self.writer_version)
2245                .field("enabled_statistics", &self.enabled_statistics)
2246                .field("encoding", &self.encoding)
2247                .field("row_selections", &self.row_selections.is_some())
2248                .field("row_filter", &self.row_filter.is_some())
2249                .field("limit", &self.limit)
2250                .field("offset", &self.offset)
2251                .finish()
2252        }
2253    }
2254
2255    impl Default for TestOptions {
2256        fn default() -> Self {
2257            Self {
2258                num_row_groups: 2,
2259                num_rows: 100,
2260                record_batch_size: 15,
2261                null_percent: None,
2262                write_batch_size: 64,
2263                max_data_page_size: 1024 * 1024,
2264                max_dict_page_size: 1024 * 1024,
2265                writer_version: WriterVersion::PARQUET_1_0,
2266                enabled_statistics: EnabledStatistics::Page,
2267                encoding: Encoding::PLAIN,
2268                row_selections: None,
2269                row_filter: None,
2270                limit: None,
2271                offset: None,
2272            }
2273        }
2274    }
2275
2276    impl TestOptions {
2277        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2278            Self {
2279                num_row_groups,
2280                num_rows,
2281                record_batch_size,
2282                ..Default::default()
2283            }
2284        }
2285
2286        fn with_null_percent(self, null_percent: usize) -> Self {
2287            Self {
2288                null_percent: Some(null_percent),
2289                ..self
2290            }
2291        }
2292
2293        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2294            Self {
2295                max_data_page_size,
2296                ..self
2297            }
2298        }
2299
2300        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2301            Self {
2302                max_dict_page_size,
2303                ..self
2304            }
2305        }
2306
2307        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2308            Self {
2309                enabled_statistics,
2310                ..self
2311            }
2312        }
2313
2314        fn with_row_selections(self) -> Self {
2315            assert!(self.row_filter.is_none(), "Must set row selection first");
2316
2317            let mut rng = rng();
2318            let step = rng.random_range(self.record_batch_size..self.num_rows);
2319            let row_selections = create_test_selection(
2320                step,
2321                self.num_row_groups * self.num_rows,
2322                rng.random::<bool>(),
2323            );
2324            Self {
2325                row_selections: Some(row_selections),
2326                ..self
2327            }
2328        }
2329
2330        fn with_row_filter(self) -> Self {
2331            let row_count = match &self.row_selections {
2332                Some((_, count)) => *count,
2333                None => self.num_row_groups * self.num_rows,
2334            };
2335
2336            let mut rng = rng();
2337            Self {
2338                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2339                ..self
2340            }
2341        }
2342
2343        fn with_limit(self, limit: usize) -> Self {
2344            Self {
2345                limit: Some(limit),
2346                ..self
2347            }
2348        }
2349
2350        fn with_offset(self, offset: usize) -> Self {
2351            Self {
2352                offset: Some(offset),
2353                ..self
2354            }
2355        }
2356
2357        fn writer_props(&self) -> WriterProperties {
2358            let builder = WriterProperties::builder()
2359                .set_data_page_size_limit(self.max_data_page_size)
2360                .set_write_batch_size(self.write_batch_size)
2361                .set_writer_version(self.writer_version)
2362                .set_statistics_enabled(self.enabled_statistics);
2363
2364            let builder = match self.encoding {
2365                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2366                    .set_dictionary_enabled(true)
2367                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2368                _ => builder
2369                    .set_dictionary_enabled(false)
2370                    .set_encoding(self.encoding),
2371            };
2372
2373            builder.build()
2374        }
2375    }
2376
2377    /// Create a parquet file and then read it using
2378    /// `ParquetFileArrowReader` using a standard set of parameters
2379    /// `opts`.
2380    ///
2381    /// `rand_max` represents the maximum size of value to pass to to
2382    /// value generator
2383    fn run_single_column_reader_tests<T, F, G>(
2384        rand_max: i32,
2385        converted_type: ConvertedType,
2386        arrow_type: Option<ArrowDataType>,
2387        converter: F,
2388        encodings: &[Encoding],
2389    ) where
2390        T: DataType,
2391        G: RandGen<T>,
2392        F: Fn(&[Option<T::T>]) -> ArrayRef,
2393    {
2394        let all_options = vec![
2395            // choose record_batch_batch (15) so batches cross row
2396            // group boundaries (50 rows in 2 row groups) cases.
2397            TestOptions::new(2, 100, 15),
2398            // choose record_batch_batch (5) so batches sometime fall
2399            // on row group boundaries and (25 rows in 3 row groups
2400            // --> row groups of 10, 10, and 5). Tests buffer
2401            // refilling edge cases.
2402            TestOptions::new(3, 25, 5),
2403            // Choose record_batch_size (25) so all batches fall
2404            // exactly on row group boundary (25). Tests buffer
2405            // refilling edge cases.
2406            TestOptions::new(4, 100, 25),
2407            // Set maximum page size so row groups have multiple pages
2408            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2409            // Set small dictionary page size to test dictionary fallback
2410            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2411            // Test optional but with no nulls
2412            TestOptions::new(2, 256, 127).with_null_percent(0),
2413            // Test optional with nulls
2414            TestOptions::new(2, 256, 93).with_null_percent(25),
2415            // Test with limit of 0
2416            TestOptions::new(4, 100, 25).with_limit(0),
2417            // Test with limit of 50
2418            TestOptions::new(4, 100, 25).with_limit(50),
2419            // Test with limit equal to number of rows
2420            TestOptions::new(4, 100, 25).with_limit(10),
2421            // Test with limit larger than number of rows
2422            TestOptions::new(4, 100, 25).with_limit(101),
2423            // Test with limit + offset equal to number of rows
2424            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2425            // Test with limit + offset equal to number of rows
2426            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2427            // Test with limit + offset larger than number of rows
2428            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2429            // Test with no page-level statistics
2430            TestOptions::new(2, 256, 91)
2431                .with_null_percent(25)
2432                .with_enabled_statistics(EnabledStatistics::Chunk),
2433            // Test with no statistics
2434            TestOptions::new(2, 256, 91)
2435                .with_null_percent(25)
2436                .with_enabled_statistics(EnabledStatistics::None),
2437            // Test with all null
2438            TestOptions::new(2, 128, 91)
2439                .with_null_percent(100)
2440                .with_enabled_statistics(EnabledStatistics::None),
2441            // Test skip
2442
2443            // choose record_batch_batch (15) so batches cross row
2444            // group boundaries (50 rows in 2 row groups) cases.
2445            TestOptions::new(2, 100, 15).with_row_selections(),
2446            // choose record_batch_batch (5) so batches sometime fall
2447            // on row group boundaries and (25 rows in 3 row groups
2448            // --> row groups of 10, 10, and 5). Tests buffer
2449            // refilling edge cases.
2450            TestOptions::new(3, 25, 5).with_row_selections(),
2451            // Choose record_batch_size (25) so all batches fall
2452            // exactly on row group boundary (25). Tests buffer
2453            // refilling edge cases.
2454            TestOptions::new(4, 100, 25).with_row_selections(),
2455            // Set maximum page size so row groups have multiple pages
2456            TestOptions::new(3, 256, 73)
2457                .with_max_data_page_size(128)
2458                .with_row_selections(),
2459            // Set small dictionary page size to test dictionary fallback
2460            TestOptions::new(3, 256, 57)
2461                .with_max_dict_page_size(128)
2462                .with_row_selections(),
2463            // Test optional but with no nulls
2464            TestOptions::new(2, 256, 127)
2465                .with_null_percent(0)
2466                .with_row_selections(),
2467            // Test optional with nulls
2468            TestOptions::new(2, 256, 93)
2469                .with_null_percent(25)
2470                .with_row_selections(),
2471            // Test optional with nulls
2472            TestOptions::new(2, 256, 93)
2473                .with_null_percent(25)
2474                .with_row_selections()
2475                .with_limit(10),
2476            // Test optional with nulls
2477            TestOptions::new(2, 256, 93)
2478                .with_null_percent(25)
2479                .with_row_selections()
2480                .with_offset(20)
2481                .with_limit(10),
2482            // Test filter
2483
2484            // Test with row filter
2485            TestOptions::new(4, 100, 25).with_row_filter(),
2486            // Test with row selection and row filter
2487            TestOptions::new(4, 100, 25)
2488                .with_row_selections()
2489                .with_row_filter(),
2490            // Test with nulls and row filter
2491            TestOptions::new(2, 256, 93)
2492                .with_null_percent(25)
2493                .with_max_data_page_size(10)
2494                .with_row_filter(),
2495            // Test with nulls and row filter and small pages
2496            TestOptions::new(2, 256, 93)
2497                .with_null_percent(25)
2498                .with_max_data_page_size(10)
2499                .with_row_selections()
2500                .with_row_filter(),
2501            // Test with row selection and no offset index and small pages
2502            TestOptions::new(2, 256, 93)
2503                .with_enabled_statistics(EnabledStatistics::None)
2504                .with_max_data_page_size(10)
2505                .with_row_selections(),
2506        ];
2507
2508        all_options.into_iter().for_each(|opts| {
2509            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2510                for encoding in encodings {
2511                    let opts = TestOptions {
2512                        writer_version,
2513                        encoding: *encoding,
2514                        ..opts.clone()
2515                    };
2516
2517                    single_column_reader_test::<T, _, G>(
2518                        opts,
2519                        rand_max,
2520                        converted_type,
2521                        arrow_type.clone(),
2522                        &converter,
2523                    )
2524                }
2525            }
2526        });
2527    }
2528
2529    /// Create a parquet file and then read it using
2530    /// `ParquetFileArrowReader` using the parameters described in
2531    /// `opts`.
2532    fn single_column_reader_test<T, F, G>(
2533        opts: TestOptions,
2534        rand_max: i32,
2535        converted_type: ConvertedType,
2536        arrow_type: Option<ArrowDataType>,
2537        converter: F,
2538    ) where
2539        T: DataType,
2540        G: RandGen<T>,
2541        F: Fn(&[Option<T::T>]) -> ArrayRef,
2542    {
2543        // Print out options to facilitate debugging failures on CI
2544        println!(
2545            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2546            T::get_physical_type(), converted_type, arrow_type, opts
2547        );
2548
2549        //according to null_percent generate def_levels
2550        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2551            Some(null_percent) => {
2552                let mut rng = rng();
2553
2554                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2555                    .map(|_| {
2556                        std::iter::from_fn(|| {
2557                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2558                        })
2559                        .take(opts.num_rows)
2560                        .collect()
2561                    })
2562                    .collect();
2563                (Repetition::OPTIONAL, Some(def_levels))
2564            }
2565            None => (Repetition::REQUIRED, None),
2566        };
2567
2568        //generate random table data
2569        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2570            .map(|idx| {
2571                let null_count = match def_levels.as_ref() {
2572                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2573                    None => 0,
2574                };
2575                G::gen_vec(rand_max, opts.num_rows - null_count)
2576            })
2577            .collect();
2578
2579        let len = match T::get_physical_type() {
2580            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2581            crate::basic::Type::INT96 => 12,
2582            _ => -1,
2583        };
2584
2585        let fields = vec![Arc::new(
2586            Type::primitive_type_builder("leaf", T::get_physical_type())
2587                .with_repetition(repetition)
2588                .with_converted_type(converted_type)
2589                .with_length(len)
2590                .build()
2591                .unwrap(),
2592        )];
2593
2594        let schema = Arc::new(
2595            Type::group_type_builder("test_schema")
2596                .with_fields(fields)
2597                .build()
2598                .unwrap(),
2599        );
2600
2601        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2602
2603        let mut file = tempfile::tempfile().unwrap();
2604
2605        generate_single_column_file_with_data::<T>(
2606            &values,
2607            def_levels.as_ref(),
2608            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2609            schema,
2610            arrow_field,
2611            &opts,
2612        )
2613        .unwrap();
2614
2615        file.rewind().unwrap();
2616
2617        let options = ArrowReaderOptions::new()
2618            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2619
2620        let mut builder =
2621            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2622
2623        let expected_data = match opts.row_selections {
2624            Some((selections, row_count)) => {
2625                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2626
2627                let mut skip_data: Vec<Option<T::T>> = vec![];
2628                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2629                for select in dequeue {
2630                    if select.skip {
2631                        without_skip_data.drain(0..select.row_count);
2632                    } else {
2633                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2634                    }
2635                }
2636                builder = builder.with_row_selection(selections);
2637
2638                assert_eq!(skip_data.len(), row_count);
2639                skip_data
2640            }
2641            None => {
2642                //get flatten table data
2643                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2644                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2645                expected_data
2646            }
2647        };
2648
2649        let mut expected_data = match opts.row_filter {
2650            Some(filter) => {
2651                let expected_data = expected_data
2652                    .into_iter()
2653                    .zip(filter.iter())
2654                    .filter_map(|(d, f)| f.then(|| d))
2655                    .collect();
2656
2657                let mut filter_offset = 0;
2658                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2659                    ProjectionMask::all(),
2660                    move |b| {
2661                        let array = BooleanArray::from_iter(
2662                            filter
2663                                .iter()
2664                                .skip(filter_offset)
2665                                .take(b.num_rows())
2666                                .map(|x| Some(*x)),
2667                        );
2668                        filter_offset += b.num_rows();
2669                        Ok(array)
2670                    },
2671                ))]);
2672
2673                builder = builder.with_row_filter(filter);
2674                expected_data
2675            }
2676            None => expected_data,
2677        };
2678
2679        if let Some(offset) = opts.offset {
2680            builder = builder.with_offset(offset);
2681            expected_data = expected_data.into_iter().skip(offset).collect();
2682        }
2683
2684        if let Some(limit) = opts.limit {
2685            builder = builder.with_limit(limit);
2686            expected_data = expected_data.into_iter().take(limit).collect();
2687        }
2688
2689        let mut record_reader = builder
2690            .with_batch_size(opts.record_batch_size)
2691            .build()
2692            .unwrap();
2693
2694        let mut total_read = 0;
2695        loop {
2696            let maybe_batch = record_reader.next();
2697            if total_read < expected_data.len() {
2698                let end = min(total_read + opts.record_batch_size, expected_data.len());
2699                let batch = maybe_batch.unwrap().unwrap();
2700                assert_eq!(end - total_read, batch.num_rows());
2701
2702                let a = converter(&expected_data[total_read..end]);
2703                let b = Arc::clone(batch.column(0));
2704
2705                assert_eq!(a.data_type(), b.data_type());
2706                assert_eq!(a.to_data(), b.to_data());
2707                assert_eq!(
2708                    a.as_any().type_id(),
2709                    b.as_any().type_id(),
2710                    "incorrect type ids"
2711                );
2712
2713                total_read = end;
2714            } else {
2715                assert!(maybe_batch.is_none());
2716                break;
2717            }
2718        }
2719    }
2720
2721    fn gen_expected_data<T: DataType>(
2722        def_levels: Option<&Vec<Vec<i16>>>,
2723        values: &[Vec<T::T>],
2724    ) -> Vec<Option<T::T>> {
2725        let data: Vec<Option<T::T>> = match def_levels {
2726            Some(levels) => {
2727                let mut values_iter = values.iter().flatten();
2728                levels
2729                    .iter()
2730                    .flatten()
2731                    .map(|d| match d {
2732                        1 => Some(values_iter.next().cloned().unwrap()),
2733                        0 => None,
2734                        _ => unreachable!(),
2735                    })
2736                    .collect()
2737            }
2738            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2739        };
2740        data
2741    }
2742
2743    fn generate_single_column_file_with_data<T: DataType>(
2744        values: &[Vec<T::T>],
2745        def_levels: Option<&Vec<Vec<i16>>>,
2746        file: File,
2747        schema: TypePtr,
2748        field: Option<Field>,
2749        opts: &TestOptions,
2750    ) -> Result<crate::format::FileMetaData> {
2751        let mut writer_props = opts.writer_props();
2752        if let Some(field) = field {
2753            let arrow_schema = Schema::new(vec![field]);
2754            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2755        }
2756
2757        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2758
2759        for (idx, v) in values.iter().enumerate() {
2760            let def_levels = def_levels.map(|d| d[idx].as_slice());
2761            let mut row_group_writer = writer.next_row_group()?;
2762            {
2763                let mut column_writer = row_group_writer
2764                    .next_column()?
2765                    .expect("Column writer is none!");
2766
2767                column_writer
2768                    .typed::<T>()
2769                    .write_batch(v, def_levels, None)?;
2770
2771                column_writer.close()?;
2772            }
2773            row_group_writer.close()?;
2774        }
2775
2776        writer.close()
2777    }
2778
2779    fn get_test_file(file_name: &str) -> File {
2780        let mut path = PathBuf::new();
2781        path.push(arrow::util::test_util::arrow_test_data());
2782        path.push(file_name);
2783
2784        File::open(path.as_path()).expect("File not found!")
2785    }
2786
2787    #[test]
2788    fn test_read_structs() {
2789        // This particular test file has columns of struct types where there is
2790        // a column that has the same name as one of the struct fields
2791        // (see: ARROW-11452)
2792        let testdata = arrow::util::test_util::parquet_test_data();
2793        let path = format!("{testdata}/nested_structs.rust.parquet");
2794        let file = File::open(&path).unwrap();
2795        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2796
2797        for batch in record_batch_reader {
2798            batch.unwrap();
2799        }
2800
2801        let file = File::open(&path).unwrap();
2802        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2803
2804        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2805        let projected_reader = builder
2806            .with_projection(mask)
2807            .with_batch_size(60)
2808            .build()
2809            .unwrap();
2810
2811        let expected_schema = Schema::new(vec![
2812            Field::new(
2813                "roll_num",
2814                ArrowDataType::Struct(Fields::from(vec![Field::new(
2815                    "count",
2816                    ArrowDataType::UInt64,
2817                    false,
2818                )])),
2819                false,
2820            ),
2821            Field::new(
2822                "PC_CUR",
2823                ArrowDataType::Struct(Fields::from(vec![
2824                    Field::new("mean", ArrowDataType::Int64, false),
2825                    Field::new("sum", ArrowDataType::Int64, false),
2826                ])),
2827                false,
2828            ),
2829        ]);
2830
2831        // Tests for #1652 and #1654
2832        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2833
2834        for batch in projected_reader {
2835            let batch = batch.unwrap();
2836            assert_eq!(batch.schema().as_ref(), &expected_schema);
2837        }
2838    }
2839
2840    #[test]
2841    // same as test_read_structs but constructs projection mask via column names
2842    fn test_read_structs_by_name() {
2843        let testdata = arrow::util::test_util::parquet_test_data();
2844        let path = format!("{testdata}/nested_structs.rust.parquet");
2845        let file = File::open(&path).unwrap();
2846        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2847
2848        for batch in record_batch_reader {
2849            batch.unwrap();
2850        }
2851
2852        let file = File::open(&path).unwrap();
2853        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2854
2855        let mask = ProjectionMask::columns(
2856            builder.parquet_schema(),
2857            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2858        );
2859        let projected_reader = builder
2860            .with_projection(mask)
2861            .with_batch_size(60)
2862            .build()
2863            .unwrap();
2864
2865        let expected_schema = Schema::new(vec![
2866            Field::new(
2867                "roll_num",
2868                ArrowDataType::Struct(Fields::from(vec![Field::new(
2869                    "count",
2870                    ArrowDataType::UInt64,
2871                    false,
2872                )])),
2873                false,
2874            ),
2875            Field::new(
2876                "PC_CUR",
2877                ArrowDataType::Struct(Fields::from(vec![
2878                    Field::new("mean", ArrowDataType::Int64, false),
2879                    Field::new("sum", ArrowDataType::Int64, false),
2880                ])),
2881                false,
2882            ),
2883        ]);
2884
2885        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2886
2887        for batch in projected_reader {
2888            let batch = batch.unwrap();
2889            assert_eq!(batch.schema().as_ref(), &expected_schema);
2890        }
2891    }
2892
2893    #[test]
2894    fn test_read_maps() {
2895        let testdata = arrow::util::test_util::parquet_test_data();
2896        let path = format!("{testdata}/nested_maps.snappy.parquet");
2897        let file = File::open(path).unwrap();
2898        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2899
2900        for batch in record_batch_reader {
2901            batch.unwrap();
2902        }
2903    }
2904
2905    #[test]
2906    fn test_nested_nullability() {
2907        let message_type = "message nested {
2908          OPTIONAL Group group {
2909            REQUIRED INT32 leaf;
2910          }
2911        }";
2912
2913        let file = tempfile::tempfile().unwrap();
2914        let schema = Arc::new(parse_message_type(message_type).unwrap());
2915
2916        {
2917            // Write using low-level parquet API (#1167)
2918            let mut writer =
2919                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2920                    .unwrap();
2921
2922            {
2923                let mut row_group_writer = writer.next_row_group().unwrap();
2924                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2925
2926                column_writer
2927                    .typed::<Int32Type>()
2928                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2929                    .unwrap();
2930
2931                column_writer.close().unwrap();
2932                row_group_writer.close().unwrap();
2933            }
2934
2935            writer.close().unwrap();
2936        }
2937
2938        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2939        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2940
2941        let reader = builder.with_projection(mask).build().unwrap();
2942
2943        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2944            "group",
2945            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2946            true,
2947        )]));
2948
2949        let batch = reader.into_iter().next().unwrap().unwrap();
2950        assert_eq!(batch.schema().as_ref(), &expected_schema);
2951        assert_eq!(batch.num_rows(), 4);
2952        assert_eq!(batch.column(0).null_count(), 2);
2953    }
2954
2955    #[test]
2956    fn test_invalid_utf8() {
2957        // a parquet file with 1 column with invalid utf8
2958        let data = vec![
2959            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2960            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2961            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2962            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2963            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2964            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2965            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2966            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2967            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2968            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2969            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2970            82, 49,
2971        ];
2972
2973        let file = Bytes::from(data);
2974        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2975
2976        let error = record_batch_reader.next().unwrap().unwrap_err();
2977
2978        assert!(
2979            error.to_string().contains("invalid utf-8 sequence"),
2980            "{}",
2981            error
2982        );
2983    }
2984
2985    #[test]
2986    fn test_invalid_utf8_string_array() {
2987        test_invalid_utf8_string_array_inner::<i32>();
2988    }
2989
2990    #[test]
2991    fn test_invalid_utf8_large_string_array() {
2992        test_invalid_utf8_string_array_inner::<i64>();
2993    }
2994
2995    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2996        let cases = [
2997            invalid_utf8_first_char::<O>(),
2998            invalid_utf8_first_char_long_strings::<O>(),
2999            invalid_utf8_later_char::<O>(),
3000            invalid_utf8_later_char_long_strings::<O>(),
3001            invalid_utf8_later_char_really_long_strings::<O>(),
3002            invalid_utf8_later_char_really_long_strings2::<O>(),
3003        ];
3004        for array in &cases {
3005            for encoding in STRING_ENCODINGS {
3006                // data is not valid utf8 we can not construct a correct StringArray
3007                // safely, so purposely create an invalid StringArray
3008                let array = unsafe {
3009                    GenericStringArray::<O>::new_unchecked(
3010                        array.offsets().clone(),
3011                        array.values().clone(),
3012                        array.nulls().cloned(),
3013                    )
3014                };
3015                let data_type = array.data_type().clone();
3016                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3017                let err = read_from_parquet(data).unwrap_err();
3018                let expected_err =
3019                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3020                assert!(
3021                    err.to_string().contains(expected_err),
3022                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3023                );
3024            }
3025        }
3026    }
3027
3028    #[test]
3029    fn test_invalid_utf8_string_view_array() {
3030        let cases = [
3031            invalid_utf8_first_char::<i32>(),
3032            invalid_utf8_first_char_long_strings::<i32>(),
3033            invalid_utf8_later_char::<i32>(),
3034            invalid_utf8_later_char_long_strings::<i32>(),
3035            invalid_utf8_later_char_really_long_strings::<i32>(),
3036            invalid_utf8_later_char_really_long_strings2::<i32>(),
3037        ];
3038
3039        for encoding in STRING_ENCODINGS {
3040            for array in &cases {
3041                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3042                let array = array.as_binary_view();
3043
3044                // data is not valid utf8 we can not construct a correct StringArray
3045                // safely, so purposely create an invalid StringViewArray
3046                let array = unsafe {
3047                    StringViewArray::new_unchecked(
3048                        array.views().clone(),
3049                        array.data_buffers().to_vec(),
3050                        array.nulls().cloned(),
3051                    )
3052                };
3053
3054                let data_type = array.data_type().clone();
3055                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3056                let err = read_from_parquet(data).unwrap_err();
3057                let expected_err =
3058                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3059                assert!(
3060                    err.to_string().contains(expected_err),
3061                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3062                );
3063            }
3064        }
3065    }
3066
3067    /// Encodings suitable for string data
3068    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3069        None,
3070        Some(Encoding::PLAIN),
3071        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3072        Some(Encoding::DELTA_BYTE_ARRAY),
3073    ];
3074
3075    /// Invalid Utf-8 sequence in the first character
3076    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3077    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3078
3079    /// Invalid Utf=8 sequence in NOT the first character
3080    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3081    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3082
3083    /// returns a BinaryArray with invalid UTF8 data in the first character
3084    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3085        let valid: &[u8] = b"   ";
3086        let invalid = INVALID_UTF8_FIRST_CHAR;
3087        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3088    }
3089
3090    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3091    /// string larger than 12 bytes which is handled specially when reading
3092    /// `ByteViewArray`s
3093    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3094        let valid: &[u8] = b"   ";
3095        let mut invalid = vec![];
3096        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3097        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3098        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3099    }
3100
3101    /// returns a BinaryArray with invalid UTF8 data in a character other than
3102    /// the first (this is checked in a special codepath)
3103    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3104        let valid: &[u8] = b"   ";
3105        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3106        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3107    }
3108
3109    /// returns a BinaryArray with invalid UTF8 data in a character other than
3110    /// the first in a string larger than 12 bytes which is handled specially
3111    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3112    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3113        let valid: &[u8] = b"   ";
3114        let mut invalid = vec![];
3115        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3116        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3117        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3118    }
3119
3120    /// returns a BinaryArray with invalid UTF8 data in a character other than
3121    /// the first in a string larger than 128 bytes which is handled specially
3122    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3123    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3124        let valid: &[u8] = b"   ";
3125        let mut invalid = vec![];
3126        for _ in 0..10 {
3127            // each instance is 38 bytes
3128            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3129        }
3130        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3131        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3132    }
3133
3134    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3135    /// invalid UTF8 data in a character other than the first in a string larger
3136    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3137        let valid: &[u8] = b"   ";
3138        let mut valid_long = vec![];
3139        for _ in 0..10 {
3140            // each instance is 38 bytes
3141            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3142        }
3143        let invalid = INVALID_UTF8_LATER_CHAR;
3144        GenericBinaryArray::<O>::from_iter(vec![
3145            None,
3146            Some(valid),
3147            Some(invalid),
3148            None,
3149            Some(&valid_long),
3150            Some(valid),
3151        ])
3152    }
3153
3154    /// writes the array into a single column parquet file with the specified
3155    /// encoding.
3156    ///
3157    /// If no encoding is specified, use default (dictionary) encoding
3158    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3159        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3160        let mut data = vec![];
3161        let schema = batch.schema();
3162        let props = encoding.map(|encoding| {
3163            WriterProperties::builder()
3164                // must disable dictionary encoding to actually use encoding
3165                .set_dictionary_enabled(false)
3166                .set_encoding(encoding)
3167                .build()
3168        });
3169
3170        {
3171            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3172            writer.write(&batch).unwrap();
3173            writer.flush().unwrap();
3174            writer.close().unwrap();
3175        };
3176        data
3177    }
3178
3179    /// read the parquet file into a record batch
3180    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3181        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3182            .unwrap()
3183            .build()
3184            .unwrap();
3185
3186        reader.collect()
3187    }
3188
3189    #[test]
3190    fn test_dictionary_preservation() {
3191        let fields = vec![Arc::new(
3192            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3193                .with_repetition(Repetition::OPTIONAL)
3194                .with_converted_type(ConvertedType::UTF8)
3195                .build()
3196                .unwrap(),
3197        )];
3198
3199        let schema = Arc::new(
3200            Type::group_type_builder("test_schema")
3201                .with_fields(fields)
3202                .build()
3203                .unwrap(),
3204        );
3205
3206        let dict_type = ArrowDataType::Dictionary(
3207            Box::new(ArrowDataType::Int32),
3208            Box::new(ArrowDataType::Utf8),
3209        );
3210
3211        let arrow_field = Field::new("leaf", dict_type, true);
3212
3213        let mut file = tempfile::tempfile().unwrap();
3214
3215        let values = vec![
3216            vec![
3217                ByteArray::from("hello"),
3218                ByteArray::from("a"),
3219                ByteArray::from("b"),
3220                ByteArray::from("d"),
3221            ],
3222            vec![
3223                ByteArray::from("c"),
3224                ByteArray::from("a"),
3225                ByteArray::from("b"),
3226            ],
3227        ];
3228
3229        let def_levels = vec![
3230            vec![1, 0, 0, 1, 0, 0, 1, 1],
3231            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3232        ];
3233
3234        let opts = TestOptions {
3235            encoding: Encoding::RLE_DICTIONARY,
3236            ..Default::default()
3237        };
3238
3239        generate_single_column_file_with_data::<ByteArrayType>(
3240            &values,
3241            Some(&def_levels),
3242            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3243            schema,
3244            Some(arrow_field),
3245            &opts,
3246        )
3247        .unwrap();
3248
3249        file.rewind().unwrap();
3250
3251        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3252
3253        let batches = record_reader
3254            .collect::<Result<Vec<RecordBatch>, _>>()
3255            .unwrap();
3256
3257        assert_eq!(batches.len(), 6);
3258        assert!(batches.iter().all(|x| x.num_columns() == 1));
3259
3260        let row_counts = batches
3261            .iter()
3262            .map(|x| (x.num_rows(), x.column(0).null_count()))
3263            .collect::<Vec<_>>();
3264
3265        assert_eq!(
3266            row_counts,
3267            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3268        );
3269
3270        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3271
3272        // First and second batch in same row group -> same dictionary
3273        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3274        // Third batch spans row group -> computed dictionary
3275        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3276        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3277        // Fourth, fifth and sixth from same row group -> same dictionary
3278        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3279        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3280    }
3281
3282    #[test]
3283    fn test_read_null_list() {
3284        let testdata = arrow::util::test_util::parquet_test_data();
3285        let path = format!("{testdata}/null_list.parquet");
3286        let file = File::open(path).unwrap();
3287        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3288
3289        let batch = record_batch_reader.next().unwrap().unwrap();
3290        assert_eq!(batch.num_rows(), 1);
3291        assert_eq!(batch.num_columns(), 1);
3292        assert_eq!(batch.column(0).len(), 1);
3293
3294        let list = batch
3295            .column(0)
3296            .as_any()
3297            .downcast_ref::<ListArray>()
3298            .unwrap();
3299        assert_eq!(list.len(), 1);
3300        assert!(list.is_valid(0));
3301
3302        let val = list.value(0);
3303        assert_eq!(val.len(), 0);
3304    }
3305
3306    #[test]
3307    fn test_null_schema_inference() {
3308        let testdata = arrow::util::test_util::parquet_test_data();
3309        let path = format!("{testdata}/null_list.parquet");
3310        let file = File::open(path).unwrap();
3311
3312        let arrow_field = Field::new(
3313            "emptylist",
3314            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3315            true,
3316        );
3317
3318        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3319        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3320        let schema = builder.schema();
3321        assert_eq!(schema.fields().len(), 1);
3322        assert_eq!(schema.field(0), &arrow_field);
3323    }
3324
3325    #[test]
3326    fn test_skip_metadata() {
3327        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3328        let field = Field::new("col", col.data_type().clone(), true);
3329
3330        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3331
3332        let metadata = [("key".to_string(), "value".to_string())]
3333            .into_iter()
3334            .collect();
3335
3336        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3337
3338        assert_ne!(schema_with_metadata, schema_without_metadata);
3339
3340        let batch =
3341            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3342
3343        let file = |version: WriterVersion| {
3344            let props = WriterProperties::builder()
3345                .set_writer_version(version)
3346                .build();
3347
3348            let file = tempfile().unwrap();
3349            let mut writer =
3350                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3351                    .unwrap();
3352            writer.write(&batch).unwrap();
3353            writer.close().unwrap();
3354            file
3355        };
3356
3357        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3358
3359        let v1_reader = file(WriterVersion::PARQUET_1_0);
3360        let v2_reader = file(WriterVersion::PARQUET_2_0);
3361
3362        let arrow_reader =
3363            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3364        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3365
3366        let reader =
3367            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3368                .unwrap()
3369                .build()
3370                .unwrap();
3371        assert_eq!(reader.schema(), schema_without_metadata);
3372
3373        let arrow_reader =
3374            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3375        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3376
3377        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3378            .unwrap()
3379            .build()
3380            .unwrap();
3381        assert_eq!(reader.schema(), schema_without_metadata);
3382    }
3383
3384    fn write_parquet_from_iter<I, F>(value: I) -> File
3385    where
3386        I: IntoIterator<Item = (F, ArrayRef)>,
3387        F: AsRef<str>,
3388    {
3389        let batch = RecordBatch::try_from_iter(value).unwrap();
3390        let file = tempfile().unwrap();
3391        let mut writer =
3392            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3393        writer.write(&batch).unwrap();
3394        writer.close().unwrap();
3395        file
3396    }
3397
3398    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3399    where
3400        I: IntoIterator<Item = (F, ArrayRef)>,
3401        F: AsRef<str>,
3402    {
3403        let file = write_parquet_from_iter(value);
3404        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3405        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3406            file.try_clone().unwrap(),
3407            options_with_schema,
3408        );
3409        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3410    }
3411
3412    #[test]
3413    fn test_schema_too_few_columns() {
3414        run_schema_test_with_error(
3415            vec![
3416                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3417                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3418            ],
3419            Arc::new(Schema::new(vec![Field::new(
3420                "int64",
3421                ArrowDataType::Int64,
3422                false,
3423            )])),
3424            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3425        );
3426    }
3427
3428    #[test]
3429    fn test_schema_too_many_columns() {
3430        run_schema_test_with_error(
3431            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3432            Arc::new(Schema::new(vec![
3433                Field::new("int64", ArrowDataType::Int64, false),
3434                Field::new("int32", ArrowDataType::Int32, false),
3435            ])),
3436            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3437        );
3438    }
3439
3440    #[test]
3441    fn test_schema_mismatched_column_names() {
3442        run_schema_test_with_error(
3443            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3444            Arc::new(Schema::new(vec![Field::new(
3445                "other",
3446                ArrowDataType::Int64,
3447                false,
3448            )])),
3449            "Arrow: incompatible arrow schema, expected field named int64 got other",
3450        );
3451    }
3452
3453    #[test]
3454    fn test_schema_incompatible_columns() {
3455        run_schema_test_with_error(
3456            vec![
3457                (
3458                    "col1_invalid",
3459                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3460                ),
3461                (
3462                    "col2_valid",
3463                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3464                ),
3465                (
3466                    "col3_invalid",
3467                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3468                ),
3469            ],
3470            Arc::new(Schema::new(vec![
3471                Field::new("col1_invalid", ArrowDataType::Int32, false),
3472                Field::new("col2_valid", ArrowDataType::Int32, false),
3473                Field::new("col3_invalid", ArrowDataType::Int32, false),
3474            ])),
3475            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3476        );
3477    }
3478
3479    #[test]
3480    fn test_one_incompatible_nested_column() {
3481        let nested_fields = Fields::from(vec![
3482            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3483            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3484        ]);
3485        let nested = StructArray::try_new(
3486            nested_fields,
3487            vec![
3488                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3489                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3490            ],
3491            None,
3492        )
3493        .expect("struct array");
3494        let supplied_nested_fields = Fields::from(vec![
3495            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3496            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3497        ]);
3498        run_schema_test_with_error(
3499            vec![
3500                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3501                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3502                ("nested", Arc::new(nested) as ArrayRef),
3503            ],
3504            Arc::new(Schema::new(vec![
3505                Field::new("col1", ArrowDataType::Int64, false),
3506                Field::new("col2", ArrowDataType::Int32, false),
3507                Field::new(
3508                    "nested",
3509                    ArrowDataType::Struct(supplied_nested_fields),
3510                    false,
3511                ),
3512            ])),
3513            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3514            requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3515            but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3516        );
3517    }
3518
3519    /// Return parquet data with a single column of utf8 strings
3520    fn utf8_parquet() -> Bytes {
3521        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3522        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3523        let props = None;
3524        // write parquet file with non nullable strings
3525        let mut parquet_data = vec![];
3526        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3527        writer.write(&batch).unwrap();
3528        writer.close().unwrap();
3529        Bytes::from(parquet_data)
3530    }
3531
3532    #[test]
3533    fn test_schema_error_bad_types() {
3534        // verify incompatible schemas error on read
3535        let parquet_data = utf8_parquet();
3536
3537        // Ask to read it back with an incompatible schema (int vs string)
3538        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3539            "column1",
3540            arrow::datatypes::DataType::Int32,
3541            false,
3542        )]));
3543
3544        // read it back out
3545        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3546        let err =
3547            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3548                .unwrap_err();
3549        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3550    }
3551
3552    #[test]
3553    fn test_schema_error_bad_nullability() {
3554        // verify incompatible schemas error on read
3555        let parquet_data = utf8_parquet();
3556
3557        // Ask to read it back with an incompatible schema (nullability mismatch)
3558        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3559            "column1",
3560            arrow::datatypes::DataType::Utf8,
3561            true,
3562        )]));
3563
3564        // read it back out
3565        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3566        let err =
3567            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3568                .unwrap_err();
3569        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3570    }
3571
3572    #[test]
3573    fn test_read_binary_as_utf8() {
3574        let file = write_parquet_from_iter(vec![
3575            (
3576                "binary_to_utf8",
3577                Arc::new(BinaryArray::from(vec![
3578                    b"one".as_ref(),
3579                    b"two".as_ref(),
3580                    b"three".as_ref(),
3581                ])) as ArrayRef,
3582            ),
3583            (
3584                "large_binary_to_large_utf8",
3585                Arc::new(LargeBinaryArray::from(vec![
3586                    b"one".as_ref(),
3587                    b"two".as_ref(),
3588                    b"three".as_ref(),
3589                ])) as ArrayRef,
3590            ),
3591            (
3592                "binary_view_to_utf8_view",
3593                Arc::new(BinaryViewArray::from(vec![
3594                    b"one".as_ref(),
3595                    b"two".as_ref(),
3596                    b"three".as_ref(),
3597                ])) as ArrayRef,
3598            ),
3599        ]);
3600        let supplied_fields = Fields::from(vec![
3601            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3602            Field::new(
3603                "large_binary_to_large_utf8",
3604                ArrowDataType::LargeUtf8,
3605                false,
3606            ),
3607            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3608        ]);
3609
3610        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3611        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3612            file.try_clone().unwrap(),
3613            options,
3614        )
3615        .expect("reader builder with schema")
3616        .build()
3617        .expect("reader with schema");
3618
3619        let batch = arrow_reader.next().unwrap().unwrap();
3620        assert_eq!(batch.num_columns(), 3);
3621        assert_eq!(batch.num_rows(), 3);
3622        assert_eq!(
3623            batch
3624                .column(0)
3625                .as_string::<i32>()
3626                .iter()
3627                .collect::<Vec<_>>(),
3628            vec![Some("one"), Some("two"), Some("three")]
3629        );
3630
3631        assert_eq!(
3632            batch
3633                .column(1)
3634                .as_string::<i64>()
3635                .iter()
3636                .collect::<Vec<_>>(),
3637            vec![Some("one"), Some("two"), Some("three")]
3638        );
3639
3640        assert_eq!(
3641            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3642            vec![Some("one"), Some("two"), Some("three")]
3643        );
3644    }
3645
3646    #[test]
3647    #[should_panic(expected = "Invalid UTF8 sequence at")]
3648    fn test_read_non_utf8_binary_as_utf8() {
3649        let file = write_parquet_from_iter(vec![(
3650            "non_utf8_binary",
3651            Arc::new(BinaryArray::from(vec![
3652                b"\xDE\x00\xFF".as_ref(),
3653                b"\xDE\x01\xAA".as_ref(),
3654                b"\xDE\x02\xFF".as_ref(),
3655            ])) as ArrayRef,
3656        )]);
3657        let supplied_fields = Fields::from(vec![Field::new(
3658            "non_utf8_binary",
3659            ArrowDataType::Utf8,
3660            false,
3661        )]);
3662
3663        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3664        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3665            file.try_clone().unwrap(),
3666            options,
3667        )
3668        .expect("reader builder with schema")
3669        .build()
3670        .expect("reader with schema");
3671        arrow_reader.next().unwrap().unwrap_err();
3672    }
3673
3674    #[test]
3675    fn test_with_schema() {
3676        let nested_fields = Fields::from(vec![
3677            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3678            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3679        ]);
3680
3681        let nested_arrays: Vec<ArrayRef> = vec![
3682            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3683            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3684        ];
3685
3686        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3687
3688        let file = write_parquet_from_iter(vec![
3689            (
3690                "int32_to_ts_second",
3691                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3692            ),
3693            (
3694                "date32_to_date64",
3695                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3696            ),
3697            ("nested", Arc::new(nested) as ArrayRef),
3698        ]);
3699
3700        let supplied_nested_fields = Fields::from(vec![
3701            Field::new(
3702                "utf8_to_dict",
3703                ArrowDataType::Dictionary(
3704                    Box::new(ArrowDataType::Int32),
3705                    Box::new(ArrowDataType::Utf8),
3706                ),
3707                false,
3708            ),
3709            Field::new(
3710                "int64_to_ts_nano",
3711                ArrowDataType::Timestamp(
3712                    arrow::datatypes::TimeUnit::Nanosecond,
3713                    Some("+10:00".into()),
3714                ),
3715                false,
3716            ),
3717        ]);
3718
3719        let supplied_schema = Arc::new(Schema::new(vec![
3720            Field::new(
3721                "int32_to_ts_second",
3722                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3723                false,
3724            ),
3725            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3726            Field::new(
3727                "nested",
3728                ArrowDataType::Struct(supplied_nested_fields),
3729                false,
3730            ),
3731        ]));
3732
3733        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3734        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3735            file.try_clone().unwrap(),
3736            options,
3737        )
3738        .expect("reader builder with schema")
3739        .build()
3740        .expect("reader with schema");
3741
3742        assert_eq!(arrow_reader.schema(), supplied_schema);
3743        let batch = arrow_reader.next().unwrap().unwrap();
3744        assert_eq!(batch.num_columns(), 3);
3745        assert_eq!(batch.num_rows(), 4);
3746        assert_eq!(
3747            batch
3748                .column(0)
3749                .as_any()
3750                .downcast_ref::<TimestampSecondArray>()
3751                .expect("downcast to timestamp second")
3752                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3753                .map(|v| v.to_string())
3754                .expect("value as datetime"),
3755            "1970-01-01 01:00:00 +01:00"
3756        );
3757        assert_eq!(
3758            batch
3759                .column(1)
3760                .as_any()
3761                .downcast_ref::<Date64Array>()
3762                .expect("downcast to date64")
3763                .value_as_date(0)
3764                .map(|v| v.to_string())
3765                .expect("value as date"),
3766            "1970-01-01"
3767        );
3768
3769        let nested = batch
3770            .column(2)
3771            .as_any()
3772            .downcast_ref::<StructArray>()
3773            .expect("downcast to struct");
3774
3775        let nested_dict = nested
3776            .column(0)
3777            .as_any()
3778            .downcast_ref::<Int32DictionaryArray>()
3779            .expect("downcast to dictionary");
3780
3781        assert_eq!(
3782            nested_dict
3783                .values()
3784                .as_any()
3785                .downcast_ref::<StringArray>()
3786                .expect("downcast to string")
3787                .iter()
3788                .collect::<Vec<_>>(),
3789            vec![Some("a"), Some("b")]
3790        );
3791
3792        assert_eq!(
3793            nested_dict.keys().iter().collect::<Vec<_>>(),
3794            vec![Some(0), Some(0), Some(0), Some(1)]
3795        );
3796
3797        assert_eq!(
3798            nested
3799                .column(1)
3800                .as_any()
3801                .downcast_ref::<TimestampNanosecondArray>()
3802                .expect("downcast to timestamp nanosecond")
3803                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3804                .map(|v| v.to_string())
3805                .expect("value as datetime"),
3806            "1970-01-01 10:00:00.000000001 +10:00"
3807        );
3808    }
3809
3810    #[test]
3811    fn test_empty_projection() {
3812        let testdata = arrow::util::test_util::parquet_test_data();
3813        let path = format!("{testdata}/alltypes_plain.parquet");
3814        let file = File::open(path).unwrap();
3815
3816        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3817        let file_metadata = builder.metadata().file_metadata();
3818        let expected_rows = file_metadata.num_rows() as usize;
3819
3820        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3821        let batch_reader = builder
3822            .with_projection(mask)
3823            .with_batch_size(2)
3824            .build()
3825            .unwrap();
3826
3827        let mut total_rows = 0;
3828        for maybe_batch in batch_reader {
3829            let batch = maybe_batch.unwrap();
3830            total_rows += batch.num_rows();
3831            assert_eq!(batch.num_columns(), 0);
3832            assert!(batch.num_rows() <= 2);
3833        }
3834
3835        assert_eq!(total_rows, expected_rows);
3836    }
3837
3838    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3839        let schema = Arc::new(Schema::new(vec![Field::new(
3840            "list",
3841            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3842            true,
3843        )]));
3844
3845        let mut buf = Vec::with_capacity(1024);
3846
3847        let mut writer = ArrowWriter::try_new(
3848            &mut buf,
3849            schema.clone(),
3850            Some(
3851                WriterProperties::builder()
3852                    .set_max_row_group_size(row_group_size)
3853                    .build(),
3854            ),
3855        )
3856        .unwrap();
3857        for _ in 0..2 {
3858            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3859            for _ in 0..(batch_size) {
3860                list_builder.append(true);
3861            }
3862            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3863                .unwrap();
3864            writer.write(&batch).unwrap();
3865        }
3866        writer.close().unwrap();
3867
3868        let mut record_reader =
3869            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3870        assert_eq!(
3871            batch_size,
3872            record_reader.next().unwrap().unwrap().num_rows()
3873        );
3874        assert_eq!(
3875            batch_size,
3876            record_reader.next().unwrap().unwrap().num_rows()
3877        );
3878    }
3879
3880    #[test]
3881    fn test_row_group_exact_multiple() {
3882        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3883        test_row_group_batch(8, 8);
3884        test_row_group_batch(10, 8);
3885        test_row_group_batch(8, 10);
3886        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3887        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3888        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3889        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3890        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3891    }
3892
3893    /// Given a RecordBatch containing all the column data, return the expected batches given
3894    /// a `batch_size` and `selection`
3895    fn get_expected_batches(
3896        column: &RecordBatch,
3897        selection: &RowSelection,
3898        batch_size: usize,
3899    ) -> Vec<RecordBatch> {
3900        let mut expected_batches = vec![];
3901
3902        let mut selection: VecDeque<_> = selection.clone().into();
3903        let mut row_offset = 0;
3904        let mut last_start = None;
3905        while row_offset < column.num_rows() && !selection.is_empty() {
3906            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3907            while batch_remaining > 0 && !selection.is_empty() {
3908                let (to_read, skip) = match selection.front_mut() {
3909                    Some(selection) if selection.row_count > batch_remaining => {
3910                        selection.row_count -= batch_remaining;
3911                        (batch_remaining, selection.skip)
3912                    }
3913                    Some(_) => {
3914                        let select = selection.pop_front().unwrap();
3915                        (select.row_count, select.skip)
3916                    }
3917                    None => break,
3918                };
3919
3920                batch_remaining -= to_read;
3921
3922                match skip {
3923                    true => {
3924                        if let Some(last_start) = last_start.take() {
3925                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3926                        }
3927                        row_offset += to_read
3928                    }
3929                    false => {
3930                        last_start.get_or_insert(row_offset);
3931                        row_offset += to_read
3932                    }
3933                }
3934            }
3935        }
3936
3937        if let Some(last_start) = last_start.take() {
3938            expected_batches.push(column.slice(last_start, row_offset - last_start))
3939        }
3940
3941        // Sanity check, all batches except the final should be the batch size
3942        for batch in &expected_batches[..expected_batches.len() - 1] {
3943            assert_eq!(batch.num_rows(), batch_size);
3944        }
3945
3946        expected_batches
3947    }
3948
3949    fn create_test_selection(
3950        step_len: usize,
3951        total_len: usize,
3952        skip_first: bool,
3953    ) -> (RowSelection, usize) {
3954        let mut remaining = total_len;
3955        let mut skip = skip_first;
3956        let mut vec = vec![];
3957        let mut selected_count = 0;
3958        while remaining != 0 {
3959            let step = if remaining > step_len {
3960                step_len
3961            } else {
3962                remaining
3963            };
3964            vec.push(RowSelector {
3965                row_count: step,
3966                skip,
3967            });
3968            remaining -= step;
3969            if !skip {
3970                selected_count += step;
3971            }
3972            skip = !skip;
3973        }
3974        (vec.into(), selected_count)
3975    }
3976
3977    #[test]
3978    fn test_scan_row_with_selection() {
3979        let testdata = arrow::util::test_util::parquet_test_data();
3980        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3981        let test_file = File::open(&path).unwrap();
3982
3983        let mut serial_reader =
3984            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3985        let data = serial_reader.next().unwrap().unwrap();
3986
3987        let do_test = |batch_size: usize, selection_len: usize| {
3988            for skip_first in [false, true] {
3989                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3990
3991                let expected = get_expected_batches(&data, &selections, batch_size);
3992                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3993                assert_eq!(
3994                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3995                    expected,
3996                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3997                );
3998            }
3999        };
4000
4001        // total row count 7300
4002        // 1. test selection len more than one page row count
4003        do_test(1000, 1000);
4004
4005        // 2. test selection len less than one page row count
4006        do_test(20, 20);
4007
4008        // 3. test selection_len less than batch_size
4009        do_test(20, 5);
4010
4011        // 4. test selection_len more than batch_size
4012        // If batch_size < selection_len
4013        do_test(20, 5);
4014
4015        fn create_skip_reader(
4016            test_file: &File,
4017            batch_size: usize,
4018            selections: RowSelection,
4019        ) -> ParquetRecordBatchReader {
4020            let options = ArrowReaderOptions::new().with_page_index(true);
4021            let file = test_file.try_clone().unwrap();
4022            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4023                .unwrap()
4024                .with_batch_size(batch_size)
4025                .with_row_selection(selections)
4026                .build()
4027                .unwrap()
4028        }
4029    }
4030
4031    #[test]
4032    fn test_batch_size_overallocate() {
4033        let testdata = arrow::util::test_util::parquet_test_data();
4034        // `alltypes_plain.parquet` only have 8 rows
4035        let path = format!("{testdata}/alltypes_plain.parquet");
4036        let test_file = File::open(path).unwrap();
4037
4038        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4039        let num_rows = builder.metadata.file_metadata().num_rows();
4040        let reader = builder
4041            .with_batch_size(1024)
4042            .with_projection(ProjectionMask::all())
4043            .build()
4044            .unwrap();
4045        assert_ne!(1024, num_rows);
4046        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4047    }
4048
4049    #[test]
4050    fn test_read_with_page_index_enabled() {
4051        let testdata = arrow::util::test_util::parquet_test_data();
4052
4053        {
4054            // `alltypes_tiny_pages.parquet` has page index
4055            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4056            let test_file = File::open(path).unwrap();
4057            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4058                test_file,
4059                ArrowReaderOptions::new().with_page_index(true),
4060            )
4061            .unwrap();
4062            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4063            let reader = builder.build().unwrap();
4064            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4065            assert_eq!(batches.len(), 8);
4066        }
4067
4068        {
4069            // `alltypes_plain.parquet` doesn't have page index
4070            let path = format!("{testdata}/alltypes_plain.parquet");
4071            let test_file = File::open(path).unwrap();
4072            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4073                test_file,
4074                ArrowReaderOptions::new().with_page_index(true),
4075            )
4076            .unwrap();
4077            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4078            // we should read the file successfully.
4079            assert!(builder.metadata().offset_index().is_none());
4080            let reader = builder.build().unwrap();
4081            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4082            assert_eq!(batches.len(), 1);
4083        }
4084    }
4085
4086    #[test]
4087    fn test_raw_repetition() {
4088        const MESSAGE_TYPE: &str = "
4089            message Log {
4090              OPTIONAL INT32 eventType;
4091              REPEATED INT32 category;
4092              REPEATED group filter {
4093                OPTIONAL INT32 error;
4094              }
4095            }
4096        ";
4097        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4098        let props = Default::default();
4099
4100        let mut buf = Vec::with_capacity(1024);
4101        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4102        let mut row_group_writer = writer.next_row_group().unwrap();
4103
4104        // column 0
4105        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4106        col_writer
4107            .typed::<Int32Type>()
4108            .write_batch(&[1], Some(&[1]), None)
4109            .unwrap();
4110        col_writer.close().unwrap();
4111        // column 1
4112        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4113        col_writer
4114            .typed::<Int32Type>()
4115            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4116            .unwrap();
4117        col_writer.close().unwrap();
4118        // column 2
4119        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4120        col_writer
4121            .typed::<Int32Type>()
4122            .write_batch(&[1], Some(&[1]), Some(&[0]))
4123            .unwrap();
4124        col_writer.close().unwrap();
4125
4126        let rg_md = row_group_writer.close().unwrap();
4127        assert_eq!(rg_md.num_rows(), 1);
4128        writer.close().unwrap();
4129
4130        let bytes = Bytes::from(buf);
4131
4132        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4133        let full = no_mask.next().unwrap().unwrap();
4134
4135        assert_eq!(full.num_columns(), 3);
4136
4137        for idx in 0..3 {
4138            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4139            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4140            let mut reader = b.with_projection(mask).build().unwrap();
4141            let projected = reader.next().unwrap().unwrap();
4142
4143            assert_eq!(projected.num_columns(), 1);
4144            assert_eq!(full.column(idx), projected.column(0));
4145        }
4146    }
4147
4148    #[test]
4149    fn test_read_lz4_raw() {
4150        let testdata = arrow::util::test_util::parquet_test_data();
4151        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4152        let file = File::open(path).unwrap();
4153
4154        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4155            .unwrap()
4156            .collect::<Result<Vec<_>, _>>()
4157            .unwrap();
4158        assert_eq!(batches.len(), 1);
4159        let batch = &batches[0];
4160
4161        assert_eq!(batch.num_columns(), 3);
4162        assert_eq!(batch.num_rows(), 4);
4163
4164        // https://github.com/apache/parquet-testing/pull/18
4165        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4166        assert_eq!(
4167            a.values(),
4168            &[1593604800, 1593604800, 1593604801, 1593604801]
4169        );
4170
4171        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4172        let a: Vec<_> = a.iter().flatten().collect();
4173        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4174
4175        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4176        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4177    }
4178
4179    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4180    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4181    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4182    //    LZ4_HADOOP algorithm for compression.
4183    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4184    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4185    //    older parquet-cpp versions.
4186    //
4187    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4188    #[test]
4189    fn test_read_lz4_hadoop_fallback() {
4190        for file in [
4191            "hadoop_lz4_compressed.parquet",
4192            "non_hadoop_lz4_compressed.parquet",
4193        ] {
4194            let testdata = arrow::util::test_util::parquet_test_data();
4195            let path = format!("{testdata}/{file}");
4196            let file = File::open(path).unwrap();
4197            let expected_rows = 4;
4198
4199            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4200                .unwrap()
4201                .collect::<Result<Vec<_>, _>>()
4202                .unwrap();
4203            assert_eq!(batches.len(), 1);
4204            let batch = &batches[0];
4205
4206            assert_eq!(batch.num_columns(), 3);
4207            assert_eq!(batch.num_rows(), expected_rows);
4208
4209            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4210            assert_eq!(
4211                a.values(),
4212                &[1593604800, 1593604800, 1593604801, 1593604801]
4213            );
4214
4215            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4216            let b: Vec<_> = b.iter().flatten().collect();
4217            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4218
4219            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4220            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4221        }
4222    }
4223
4224    #[test]
4225    fn test_read_lz4_hadoop_large() {
4226        let testdata = arrow::util::test_util::parquet_test_data();
4227        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4228        let file = File::open(path).unwrap();
4229        let expected_rows = 10000;
4230
4231        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4232            .unwrap()
4233            .collect::<Result<Vec<_>, _>>()
4234            .unwrap();
4235        assert_eq!(batches.len(), 1);
4236        let batch = &batches[0];
4237
4238        assert_eq!(batch.num_columns(), 1);
4239        assert_eq!(batch.num_rows(), expected_rows);
4240
4241        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4242        let a: Vec<_> = a.iter().flatten().collect();
4243        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4244        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4245        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4246        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4247    }
4248
4249    #[test]
4250    #[cfg(feature = "snap")]
4251    fn test_read_nested_lists() {
4252        let testdata = arrow::util::test_util::parquet_test_data();
4253        let path = format!("{testdata}/nested_lists.snappy.parquet");
4254        let file = File::open(path).unwrap();
4255
4256        let f = file.try_clone().unwrap();
4257        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4258        let expected = reader.next().unwrap().unwrap();
4259        assert_eq!(expected.num_rows(), 3);
4260
4261        let selection = RowSelection::from(vec![
4262            RowSelector::skip(1),
4263            RowSelector::select(1),
4264            RowSelector::skip(1),
4265        ]);
4266        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4267            .unwrap()
4268            .with_row_selection(selection)
4269            .build()
4270            .unwrap();
4271
4272        let actual = reader.next().unwrap().unwrap();
4273        assert_eq!(actual.num_rows(), 1);
4274        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4275    }
4276
4277    #[test]
4278    fn test_arbitrary_decimal() {
4279        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4280        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4281            .with_precision_and_scale(19, 0)
4282            .unwrap();
4283        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4284            .with_precision_and_scale(12, 0)
4285            .unwrap();
4286        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4287            .with_precision_and_scale(17, 10)
4288            .unwrap();
4289
4290        let written = RecordBatch::try_from_iter([
4291            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4292            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4293            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4294        ])
4295        .unwrap();
4296
4297        let mut buffer = Vec::with_capacity(1024);
4298        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4299        writer.write(&written).unwrap();
4300        writer.close().unwrap();
4301
4302        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4303            .unwrap()
4304            .collect::<Result<Vec<_>, _>>()
4305            .unwrap();
4306
4307        assert_eq!(&written.slice(0, 8), &read[0]);
4308    }
4309
4310    #[test]
4311    fn test_list_skip() {
4312        let mut list = ListBuilder::new(Int32Builder::new());
4313        list.append_value([Some(1), Some(2)]);
4314        list.append_value([Some(3)]);
4315        list.append_value([Some(4)]);
4316        let list = list.finish();
4317        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4318
4319        // First page contains 2 values but only 1 row
4320        let props = WriterProperties::builder()
4321            .set_data_page_row_count_limit(1)
4322            .set_write_batch_size(2)
4323            .build();
4324
4325        let mut buffer = Vec::with_capacity(1024);
4326        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4327        writer.write(&batch).unwrap();
4328        writer.close().unwrap();
4329
4330        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4331        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4332            .unwrap()
4333            .with_row_selection(selection.into())
4334            .build()
4335            .unwrap();
4336        let out = reader.next().unwrap().unwrap();
4337        assert_eq!(out.num_rows(), 1);
4338        assert_eq!(out, batch.slice(2, 1));
4339    }
4340
4341    fn test_decimal_roundtrip<T: DecimalType>() {
4342        // Precision <= 9 -> INT32
4343        // Precision <= 18 -> INT64
4344        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4345
4346        let d = |values: Vec<usize>, p: u8| {
4347            let iter = values.into_iter().map(T::Native::usize_as);
4348            PrimitiveArray::<T>::from_iter_values(iter)
4349                .with_precision_and_scale(p, 2)
4350                .unwrap()
4351        };
4352
4353        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4354        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4355        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4356        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4357
4358        let batch = RecordBatch::try_from_iter([
4359            ("d1", Arc::new(d1) as ArrayRef),
4360            ("d2", Arc::new(d2) as ArrayRef),
4361            ("d3", Arc::new(d3) as ArrayRef),
4362            ("d4", Arc::new(d4) as ArrayRef),
4363        ])
4364        .unwrap();
4365
4366        let mut buffer = Vec::with_capacity(1024);
4367        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4368        writer.write(&batch).unwrap();
4369        writer.close().unwrap();
4370
4371        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4372        let t1 = builder.parquet_schema().columns()[0].physical_type();
4373        assert_eq!(t1, PhysicalType::INT32);
4374        let t2 = builder.parquet_schema().columns()[1].physical_type();
4375        assert_eq!(t2, PhysicalType::INT64);
4376        let t3 = builder.parquet_schema().columns()[2].physical_type();
4377        assert_eq!(t3, PhysicalType::INT64);
4378        let t4 = builder.parquet_schema().columns()[3].physical_type();
4379        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4380
4381        let mut reader = builder.build().unwrap();
4382        assert_eq!(batch.schema(), reader.schema());
4383
4384        let out = reader.next().unwrap().unwrap();
4385        assert_eq!(batch, out);
4386    }
4387
4388    #[test]
4389    fn test_decimal() {
4390        test_decimal_roundtrip::<Decimal128Type>();
4391        test_decimal_roundtrip::<Decimal256Type>();
4392    }
4393
4394    #[test]
4395    fn test_list_selection() {
4396        let schema = Arc::new(Schema::new(vec![Field::new_list(
4397            "list",
4398            Field::new_list_field(ArrowDataType::Utf8, true),
4399            false,
4400        )]));
4401        let mut buf = Vec::with_capacity(1024);
4402
4403        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4404
4405        for i in 0..2 {
4406            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4407            for j in 0..1024 {
4408                list_a_builder.values().append_value(format!("{i} {j}"));
4409                list_a_builder.append(true);
4410            }
4411            let batch =
4412                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4413                    .unwrap();
4414            writer.write(&batch).unwrap();
4415        }
4416        let _metadata = writer.close().unwrap();
4417
4418        let buf = Bytes::from(buf);
4419        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4420            .unwrap()
4421            .with_row_selection(RowSelection::from(vec![
4422                RowSelector::skip(100),
4423                RowSelector::select(924),
4424                RowSelector::skip(100),
4425                RowSelector::select(924),
4426            ]))
4427            .build()
4428            .unwrap();
4429
4430        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4431        let batch = concat_batches(&schema, &batches).unwrap();
4432
4433        assert_eq!(batch.num_rows(), 924 * 2);
4434        let list = batch.column(0).as_list::<i32>();
4435
4436        for w in list.value_offsets().windows(2) {
4437            assert_eq!(w[0] + 1, w[1])
4438        }
4439        let mut values = list.values().as_string::<i32>().iter();
4440
4441        for i in 0..2 {
4442            for j in 100..1024 {
4443                let expected = format!("{i} {j}");
4444                assert_eq!(values.next().unwrap().unwrap(), &expected);
4445            }
4446        }
4447    }
4448
4449    #[test]
4450    fn test_list_selection_fuzz() {
4451        let mut rng = rng();
4452        let schema = Arc::new(Schema::new(vec![Field::new_list(
4453            "list",
4454            Field::new_list(
4455                Field::LIST_FIELD_DEFAULT_NAME,
4456                Field::new_list_field(ArrowDataType::Int32, true),
4457                true,
4458            ),
4459            true,
4460        )]));
4461        let mut buf = Vec::with_capacity(1024);
4462        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4463
4464        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4465
4466        for _ in 0..2048 {
4467            if rng.random_bool(0.2) {
4468                list_a_builder.append(false);
4469                continue;
4470            }
4471
4472            let list_a_len = rng.random_range(0..10);
4473            let list_b_builder = list_a_builder.values();
4474
4475            for _ in 0..list_a_len {
4476                if rng.random_bool(0.2) {
4477                    list_b_builder.append(false);
4478                    continue;
4479                }
4480
4481                let list_b_len = rng.random_range(0..10);
4482                let int_builder = list_b_builder.values();
4483                for _ in 0..list_b_len {
4484                    match rng.random_bool(0.2) {
4485                        true => int_builder.append_null(),
4486                        false => int_builder.append_value(rng.random()),
4487                    }
4488                }
4489                list_b_builder.append(true)
4490            }
4491            list_a_builder.append(true);
4492        }
4493
4494        let array = Arc::new(list_a_builder.finish());
4495        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4496
4497        writer.write(&batch).unwrap();
4498        let _metadata = writer.close().unwrap();
4499
4500        let buf = Bytes::from(buf);
4501
4502        let cases = [
4503            vec![
4504                RowSelector::skip(100),
4505                RowSelector::select(924),
4506                RowSelector::skip(100),
4507                RowSelector::select(924),
4508            ],
4509            vec![
4510                RowSelector::select(924),
4511                RowSelector::skip(100),
4512                RowSelector::select(924),
4513                RowSelector::skip(100),
4514            ],
4515            vec![
4516                RowSelector::skip(1023),
4517                RowSelector::select(1),
4518                RowSelector::skip(1023),
4519                RowSelector::select(1),
4520            ],
4521            vec![
4522                RowSelector::select(1),
4523                RowSelector::skip(1023),
4524                RowSelector::select(1),
4525                RowSelector::skip(1023),
4526            ],
4527        ];
4528
4529        for batch_size in [100, 1024, 2048] {
4530            for selection in &cases {
4531                let selection = RowSelection::from(selection.clone());
4532                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4533                    .unwrap()
4534                    .with_row_selection(selection.clone())
4535                    .with_batch_size(batch_size)
4536                    .build()
4537                    .unwrap();
4538
4539                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4540                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4541                assert_eq!(actual.num_rows(), selection.row_count());
4542
4543                let mut batch_offset = 0;
4544                let mut actual_offset = 0;
4545                for selector in selection.iter() {
4546                    if selector.skip {
4547                        batch_offset += selector.row_count;
4548                        continue;
4549                    }
4550
4551                    assert_eq!(
4552                        batch.slice(batch_offset, selector.row_count),
4553                        actual.slice(actual_offset, selector.row_count)
4554                    );
4555
4556                    batch_offset += selector.row_count;
4557                    actual_offset += selector.row_count;
4558                }
4559            }
4560        }
4561    }
4562
4563    #[test]
4564    fn test_read_old_nested_list() {
4565        use arrow::datatypes::DataType;
4566        use arrow::datatypes::ToByteSlice;
4567
4568        let testdata = arrow::util::test_util::parquet_test_data();
4569        // message my_record {
4570        //     REQUIRED group a (LIST) {
4571        //         REPEATED group array (LIST) {
4572        //             REPEATED INT32 array;
4573        //         }
4574        //     }
4575        // }
4576        // should be read as list<list<int32>>
4577        let path = format!("{testdata}/old_list_structure.parquet");
4578        let test_file = File::open(path).unwrap();
4579
4580        // create expected ListArray
4581        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4582
4583        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4584        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4585
4586        // Construct a list array from the above two
4587        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4588            "array",
4589            DataType::Int32,
4590            false,
4591        ))))
4592        .len(2)
4593        .add_buffer(a_value_offsets)
4594        .add_child_data(a_values.into_data())
4595        .build()
4596        .unwrap();
4597        let a = ListArray::from(a_list_data);
4598
4599        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4600        let mut reader = builder.build().unwrap();
4601        let out = reader.next().unwrap().unwrap();
4602        assert_eq!(out.num_rows(), 1);
4603        assert_eq!(out.num_columns(), 1);
4604        // grab first column
4605        let c0 = out.column(0);
4606        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4607        // get first row: [[1, 2], [3, 4]]
4608        let r0 = c0arr.value(0);
4609        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4610        assert_eq!(r0arr, &a);
4611    }
4612
4613    #[test]
4614    fn test_map_no_value() {
4615        // File schema:
4616        // message schema {
4617        //   required group my_map (MAP) {
4618        //     repeated group key_value {
4619        //       required int32 key;
4620        //       optional int32 value;
4621        //     }
4622        //   }
4623        //   required group my_map_no_v (MAP) {
4624        //     repeated group key_value {
4625        //       required int32 key;
4626        //     }
4627        //   }
4628        //   required group my_list (LIST) {
4629        //     repeated group list {
4630        //       required int32 element;
4631        //     }
4632        //   }
4633        // }
4634        let testdata = arrow::util::test_util::parquet_test_data();
4635        let path = format!("{testdata}/map_no_value.parquet");
4636        let file = File::open(path).unwrap();
4637
4638        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4639            .unwrap()
4640            .build()
4641            .unwrap();
4642        let out = reader.next().unwrap().unwrap();
4643        assert_eq!(out.num_rows(), 3);
4644        assert_eq!(out.num_columns(), 3);
4645        // my_map_no_v and my_list columns should now be equivalent
4646        let c0 = out.column(1).as_list::<i32>();
4647        let c1 = out.column(2).as_list::<i32>();
4648        assert_eq!(c0.len(), c1.len());
4649        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4650    }
4651}