parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46/// Builder for constructing Parquet readers that decode into [Apache Arrow]
47/// arrays.
48///
49/// Most users should use one of the following specializations:
50///
51/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
52/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
53///
54/// # Features
55/// * Projection pushdown: [`Self::with_projection`]
56/// * Cached metadata: [`ArrowReaderMetadata::load`]
57/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
58/// * Row group filtering: [`Self::with_row_groups`]
59/// * Range filtering: [`Self::with_row_selection`]
60/// * Row level filtering: [`Self::with_row_filter`]
61///
62/// # Implementing Predicate Pushdown
63///
64/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
65/// process, which is efficient and allows the most low level optimizations.
66///
67/// However, most Parquet based systems will apply filters at many steps prior
68/// to decoding such as pruning files, row groups and data pages. This crate
69/// provides the low level APIs needed to implement such filtering, but does not
70/// include any logic to actually evaluate predicates. For example:
71///
72/// * [`Self::with_row_groups`] for Row Group pruning
73/// * [`Self::with_row_selection`] for data page pruning
74/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
75///
76/// The rationale for this design is that implementing predicate pushdown is a
77/// complex topic and varies significantly from system to system. For example
78///
79/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
80/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
81/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
82/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
83///
84/// You can read more about this design in the [Querying Parquet with
85/// Millisecond Latency] Arrow blog post.
86///
87/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
88/// [Apache Arrow]: https://arrow.apache.org/
89/// [`StatisticsConverter`]: statistics::StatisticsConverter
90/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
91pub struct ArrowReaderBuilder<T> {
92    pub(crate) input: T,
93
94    pub(crate) metadata: Arc<ParquetMetaData>,
95
96    pub(crate) schema: SchemaRef,
97
98    pub(crate) fields: Option<Arc<ParquetField>>,
99
100    pub(crate) batch_size: usize,
101
102    pub(crate) row_groups: Option<Vec<usize>>,
103
104    pub(crate) projection: ProjectionMask,
105
106    pub(crate) filter: Option<RowFilter>,
107
108    pub(crate) selection: Option<RowSelection>,
109
110    pub(crate) limit: Option<usize>,
111
112    pub(crate) offset: Option<usize>,
113}
114
115impl<T> ArrowReaderBuilder<T> {
116    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
117        Self {
118            input,
119            metadata: metadata.metadata,
120            schema: metadata.schema,
121            fields: metadata.fields,
122            batch_size: 1024,
123            row_groups: None,
124            projection: ProjectionMask::all(),
125            filter: None,
126            selection: None,
127            limit: None,
128            offset: None,
129        }
130    }
131
132    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
133    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
134        &self.metadata
135    }
136
137    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
138    pub fn parquet_schema(&self) -> &SchemaDescriptor {
139        self.metadata.file_metadata().schema_descr()
140    }
141
142    /// Returns the arrow [`SchemaRef`] for this parquet file
143    pub fn schema(&self) -> &SchemaRef {
144        &self.schema
145    }
146
147    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
148    /// If the batch_size more than the file row count, use the file row count.
149    pub fn with_batch_size(self, batch_size: usize) -> Self {
150        // Try to avoid allocate large buffer
151        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
152        Self { batch_size, ..self }
153    }
154
155    /// Only read data from the provided row group indexes
156    ///
157    /// This is also called row group filtering
158    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
159        Self {
160            row_groups: Some(row_groups),
161            ..self
162        }
163    }
164
165    /// Only read data from the provided column indexes
166    pub fn with_projection(self, mask: ProjectionMask) -> Self {
167        Self {
168            projection: mask,
169            ..self
170        }
171    }
172
173    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
174    /// data into memory.
175    ///
176    /// This feature is used to restrict which rows are decoded within row
177    /// groups, skipping ranges of rows that are not needed. Such selections
178    /// could be determined by evaluating predicates against the parquet page
179    /// [`Index`] or some other external information available to a query
180    /// engine.
181    ///
182    /// # Notes
183    ///
184    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
185    /// applying the row selection, and therefore rows from skipped row groups
186    /// should not be included in the [`RowSelection`] (see example below)
187    ///
188    /// It is recommended to enable writing the page index if using this
189    /// functionality, to allow more efficient skipping over data pages. See
190    /// [`ArrowReaderOptions::with_page_index`].
191    ///
192    /// # Example
193    ///
194    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
195    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
196    /// row group 3:
197    ///
198    /// ```text
199    ///   Row Group 0, 1000 rows (selected)
200    ///   Row Group 1, 1000 rows (skipped)
201    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
202    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
203    /// ```
204    ///
205    /// You could pass the following [`RowSelection`]:
206    ///
207    /// ```text
208    ///  Select 1000    (scan all rows in row group 0)
209    ///  Skip 50        (skip the first 50 rows in row group 2)
210    ///  Select 50      (scan rows 50-100 in row group 2)
211    ///  Skip 900       (skip the remaining rows in row group 2)
212    ///  Skip 200       (skip the first 200 rows in row group 3)
213    ///  Select 100     (scan rows 200-300 in row group 3)
214    ///  Skip 700       (skip the remaining rows in row group 3)
215    /// ```
216    /// Note there is no entry for the (entirely) skipped row group 1.
217    ///
218    /// Note you can represent the same selection with fewer entries. Instead of
219    ///
220    /// ```text
221    ///  Skip 900       (skip the remaining rows in row group 2)
222    ///  Skip 200       (skip the first 200 rows in row group 3)
223    /// ```
224    ///
225    /// you could use
226    ///
227    /// ```text
228    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
229    /// ```
230    ///
231    /// [`Index`]: crate::file::page_index::index::Index
232    pub fn with_row_selection(self, selection: RowSelection) -> Self {
233        Self {
234            selection: Some(selection),
235            ..self
236        }
237    }
238
239    /// Provide a [`RowFilter`] to skip decoding rows
240    ///
241    /// Row filters are applied after row group selection and row selection
242    ///
243    /// It is recommended to enable reading the page index if using this functionality, to allow
244    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
245    pub fn with_row_filter(self, filter: RowFilter) -> Self {
246        Self {
247            filter: Some(filter),
248            ..self
249        }
250    }
251
252    /// Provide a limit to the number of rows to be read
253    ///
254    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
255    /// allowing it to limit the final set of rows decoded after any pushed down predicates
256    ///
257    /// It is recommended to enable reading the page index if using this functionality, to allow
258    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
259    pub fn with_limit(self, limit: usize) -> Self {
260        Self {
261            limit: Some(limit),
262            ..self
263        }
264    }
265
266    /// Provide an offset to skip over the given number of rows
267    ///
268    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
269    /// allowing it to skip rows after any pushed down predicates
270    ///
271    /// It is recommended to enable reading the page index if using this functionality, to allow
272    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
273    pub fn with_offset(self, offset: usize) -> Self {
274        Self {
275            offset: Some(offset),
276            ..self
277        }
278    }
279}
280
281/// Options that control how metadata is read for a parquet file
282///
283/// See [`ArrowReaderBuilder`] for how to configure how the column data
284/// is then read from the file, including projection and filter pushdown
285#[derive(Debug, Clone, Default)]
286pub struct ArrowReaderOptions {
287    /// Should the reader strip any user defined metadata from the Arrow schema
288    skip_arrow_metadata: bool,
289    /// If provided used as the schema hint when determining the Arrow schema,
290    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
291    ///
292    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
293    supplied_schema: Option<SchemaRef>,
294    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
295    pub(crate) page_index: bool,
296    /// If encryption is enabled, the file decryption properties can be provided
297    #[cfg(feature = "encryption")]
298    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
299}
300
301impl ArrowReaderOptions {
302    /// Create a new [`ArrowReaderOptions`] with the default settings
303    pub fn new() -> Self {
304        Self::default()
305    }
306
307    /// Skip decoding the embedded arrow metadata (defaults to `false`)
308    ///
309    /// Parquet files generated by some writers may contain embedded arrow
310    /// schema and metadata.
311    /// This may not be correct or compatible with your system,
312    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
313    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
314        Self {
315            skip_arrow_metadata,
316            ..self
317        }
318    }
319
320    /// Provide a schema hint to use when reading the Parquet file.
321    ///
322    /// If provided, this schema takes precedence over any arrow schema embedded
323    /// in the metadata (see the [`arrow`] documentation for more details).
324    ///
325    /// If the provided schema is not compatible with the data stored in the
326    /// parquet file schema, an error will be returned when constructing the
327    /// builder.
328    ///
329    /// This option is only required if you want to explicitly control the
330    /// conversion of Parquet types to Arrow types, such as casting a column to
331    /// a different type. For example, if you wanted to read an Int64 in
332    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
333    ///
334    /// [`arrow`]: crate::arrow
335    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
336    ///
337    /// # Notes
338    ///
339    /// The provided schema must have the same number of columns as the parquet schema and
340    /// the column names must be the same.
341    ///
342    /// # Example
343    /// ```
344    /// use std::io::Bytes;
345    /// use std::sync::Arc;
346    /// use tempfile::tempfile;
347    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
348    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
349    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
350    /// use parquet::arrow::ArrowWriter;
351    ///
352    /// // Write data - schema is inferred from the data to be Int32
353    /// let file = tempfile().unwrap();
354    /// let batch = RecordBatch::try_from_iter(vec![
355    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
356    /// ]).unwrap();
357    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
358    /// writer.write(&batch).unwrap();
359    /// writer.close().unwrap();
360    ///
361    /// // Read the file back.
362    /// // Supply a schema that interprets the Int32 column as a Timestamp.
363    /// let supplied_schema = Arc::new(Schema::new(vec![
364    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
365    /// ]));
366    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
367    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
368    ///     file.try_clone().unwrap(),
369    ///     options
370    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
371    ///
372    /// // Create the reader and read the data using the supplied schema.
373    /// let mut reader = builder.build().unwrap();
374    /// let _batch = reader.next().unwrap().unwrap();
375    /// ```
376    pub fn with_schema(self, schema: SchemaRef) -> Self {
377        Self {
378            supplied_schema: Some(schema),
379            skip_arrow_metadata: true,
380            ..self
381        }
382    }
383
384    /// Enable reading [`PageIndex`], if present (defaults to `false`)
385    ///
386    /// The `PageIndex` can be used to push down predicates to the parquet scan,
387    /// potentially eliminating unnecessary IO, by some query engines.
388    ///
389    /// If this is enabled, [`ParquetMetaData::column_index`] and
390    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
391    /// information is present in the file.
392    ///
393    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
394    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
395    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
396    pub fn with_page_index(self, page_index: bool) -> Self {
397        Self { page_index, ..self }
398    }
399
400    /// Provide the file decryption properties to use when reading encrypted parquet files.
401    ///
402    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
403    #[cfg(feature = "encryption")]
404    pub fn with_file_decryption_properties(
405        self,
406        file_decryption_properties: FileDecryptionProperties,
407    ) -> Self {
408        Self {
409            file_decryption_properties: Some(file_decryption_properties),
410            ..self
411        }
412    }
413
414    /// Retrieve the currently set page index behavior.
415    ///
416    /// This can be set via [`with_page_index`][Self::with_page_index].
417    pub fn page_index(&self) -> bool {
418        self.page_index
419    }
420
421    /// Retrieve the currently set file decryption properties.
422    ///
423    /// This can be set via
424    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
425    #[cfg(feature = "encryption")]
426    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
427        self.file_decryption_properties.as_ref()
428    }
429}
430
431/// The metadata necessary to construct a [`ArrowReaderBuilder`]
432///
433/// Note this structure is cheaply clone-able as it consists of several arcs.
434///
435/// This structure allows
436///
437/// 1. Loading metadata for a file once and then using that same metadata to
438///    construct multiple separate readers, for example, to distribute readers
439///    across multiple threads
440///
441/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
442///    from the file each time a reader is constructed.
443///
444/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
445#[derive(Debug, Clone)]
446pub struct ArrowReaderMetadata {
447    /// The Parquet Metadata, if known aprior
448    pub(crate) metadata: Arc<ParquetMetaData>,
449    /// The Arrow Schema
450    pub(crate) schema: SchemaRef,
451
452    pub(crate) fields: Option<Arc<ParquetField>>,
453}
454
455impl ArrowReaderMetadata {
456    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
457    ///
458    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
459    /// example of how this can be used
460    ///
461    /// # Notes
462    ///
463    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
464    /// `Self::metadata` is missing the page index, this function will attempt
465    /// to load the page index by making an object store request.
466    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
467        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
468        #[cfg(feature = "encryption")]
469        let metadata =
470            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
471        let metadata = metadata.parse_and_finish(reader)?;
472        Self::try_new(Arc::new(metadata), options)
473    }
474
475    /// Create a new [`ArrowReaderMetadata`]
476    ///
477    /// # Notes
478    ///
479    /// This function does not attempt to load the PageIndex if not present in the metadata.
480    /// See [`Self::load`] for more details.
481    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
482        match options.supplied_schema {
483            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
484            None => {
485                let kv_metadata = match options.skip_arrow_metadata {
486                    true => None,
487                    false => metadata.file_metadata().key_value_metadata(),
488                };
489
490                let (schema, fields) = parquet_to_arrow_schema_and_fields(
491                    metadata.file_metadata().schema_descr(),
492                    ProjectionMask::all(),
493                    kv_metadata,
494                )?;
495
496                Ok(Self {
497                    metadata,
498                    schema: Arc::new(schema),
499                    fields: fields.map(Arc::new),
500                })
501            }
502        }
503    }
504
505    fn with_supplied_schema(
506        metadata: Arc<ParquetMetaData>,
507        supplied_schema: SchemaRef,
508    ) -> Result<Self> {
509        let parquet_schema = metadata.file_metadata().schema_descr();
510        let field_levels = parquet_to_arrow_field_levels(
511            parquet_schema,
512            ProjectionMask::all(),
513            Some(supplied_schema.fields()),
514        )?;
515        let fields = field_levels.fields;
516        let inferred_len = fields.len();
517        let supplied_len = supplied_schema.fields().len();
518        // Ensure the supplied schema has the same number of columns as the parquet schema.
519        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
520        // different lengths, but we check here to be safe.
521        if inferred_len != supplied_len {
522            Err(arrow_err!(format!(
523                "incompatible arrow schema, expected {} columns received {}",
524                inferred_len, supplied_len
525            )))
526        } else {
527            let diff_fields: Vec<_> = supplied_schema
528                .fields()
529                .iter()
530                .zip(fields.iter())
531                .filter_map(|(field1, field2)| {
532                    if field1 != field2 {
533                        Some(field1.name().clone())
534                    } else {
535                        None
536                    }
537                })
538                .collect();
539
540            if !diff_fields.is_empty() {
541                Err(ParquetError::ArrowError(format!(
542                    "incompatible arrow schema, the following fields could not be cast: [{}]",
543                    diff_fields.join(", ")
544                )))
545            } else {
546                Ok(Self {
547                    metadata,
548                    schema: supplied_schema,
549                    fields: field_levels.levels.map(Arc::new),
550                })
551            }
552        }
553    }
554
555    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
556    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
557        &self.metadata
558    }
559
560    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
561    pub fn parquet_schema(&self) -> &SchemaDescriptor {
562        self.metadata.file_metadata().schema_descr()
563    }
564
565    /// Returns the arrow [`SchemaRef`] for this parquet file
566    pub fn schema(&self) -> &SchemaRef {
567        &self.schema
568    }
569}
570
571#[doc(hidden)]
572/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
573pub struct SyncReader<T: ChunkReader>(T);
574
575/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
576///
577/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
578///
579/// See [`ArrowReaderBuilder`] for additional member functions
580pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
581
582impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
583    /// Create a new [`ParquetRecordBatchReaderBuilder`]
584    ///
585    /// ```
586    /// # use std::sync::Arc;
587    /// # use bytes::Bytes;
588    /// # use arrow_array::{Int32Array, RecordBatch};
589    /// # use arrow_schema::{DataType, Field, Schema};
590    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
591    /// # use parquet::arrow::ArrowWriter;
592    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
593    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
594    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
595    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
596    /// # writer.write(&batch).unwrap();
597    /// # writer.close().unwrap();
598    /// # let file = Bytes::from(file);
599    /// #
600    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
601    ///
602    /// // Inspect metadata
603    /// assert_eq!(builder.metadata().num_row_groups(), 1);
604    ///
605    /// // Construct reader
606    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
607    ///
608    /// // Read data
609    /// let _batch = reader.next().unwrap().unwrap();
610    /// ```
611    pub fn try_new(reader: T) -> Result<Self> {
612        Self::try_new_with_options(reader, Default::default())
613    }
614
615    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
616    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
617        let metadata = ArrowReaderMetadata::load(&reader, options)?;
618        Ok(Self::new_with_metadata(reader, metadata))
619    }
620
621    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
622    ///
623    /// This interface allows:
624    ///
625    /// 1. Loading metadata once and using it to create multiple builders with
626    ///    potentially different settings or run on different threads
627    ///
628    /// 2. Using a cached copy of the metadata rather than re-reading it from the
629    ///    file each time a reader is constructed.
630    ///
631    /// See the docs on [`ArrowReaderMetadata`] for more details
632    ///
633    /// # Example
634    /// ```
635    /// # use std::fs::metadata;
636    /// # use std::sync::Arc;
637    /// # use bytes::Bytes;
638    /// # use arrow_array::{Int32Array, RecordBatch};
639    /// # use arrow_schema::{DataType, Field, Schema};
640    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
641    /// # use parquet::arrow::ArrowWriter;
642    /// #
643    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
644    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
645    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
646    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
647    /// # writer.write(&batch).unwrap();
648    /// # writer.close().unwrap();
649    /// # let file = Bytes::from(file);
650    /// #
651    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
652    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
653    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
654    ///
655    /// // Should be able to read from both in parallel
656    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
657    /// ```
658    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
659        Self::new_builder(SyncReader(input), metadata)
660    }
661
662    /// Build a [`ParquetRecordBatchReader`]
663    ///
664    /// Note: this will eagerly evaluate any `RowFilter` before returning
665    pub fn build(self) -> Result<ParquetRecordBatchReader> {
666        // Try to avoid allocate large buffer
667        let batch_size = self
668            .batch_size
669            .min(self.metadata.file_metadata().num_rows() as usize);
670
671        let row_groups = self
672            .row_groups
673            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
674
675        let reader = ReaderRowGroups {
676            reader: Arc::new(self.input.0),
677            metadata: self.metadata,
678            row_groups,
679        };
680
681        let mut filter = self.filter;
682        let mut selection = self.selection;
683
684        if let Some(filter) = filter.as_mut() {
685            for predicate in filter.predicates.iter_mut() {
686                if !selects_any(selection.as_ref()) {
687                    break;
688                }
689
690                let array_reader =
691                    build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
692
693                selection = Some(evaluate_predicate(
694                    batch_size,
695                    array_reader,
696                    selection,
697                    predicate.as_mut(),
698                )?);
699            }
700        }
701
702        let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
703
704        // If selection is empty, truncate
705        if !selects_any(selection.as_ref()) {
706            selection = Some(RowSelection::from(vec![]));
707        }
708
709        Ok(ParquetRecordBatchReader::new(
710            batch_size,
711            array_reader,
712            apply_range(selection, reader.num_rows(), self.offset, self.limit),
713        ))
714    }
715}
716
717struct ReaderRowGroups<T: ChunkReader> {
718    reader: Arc<T>,
719
720    metadata: Arc<ParquetMetaData>,
721    /// Optional list of row group indices to scan
722    row_groups: Vec<usize>,
723}
724
725impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
726    fn num_rows(&self) -> usize {
727        let meta = self.metadata.row_groups();
728        self.row_groups
729            .iter()
730            .map(|x| meta[*x].num_rows() as usize)
731            .sum()
732    }
733
734    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
735        Ok(Box::new(ReaderPageIterator {
736            column_idx: i,
737            reader: self.reader.clone(),
738            metadata: self.metadata.clone(),
739            row_groups: self.row_groups.clone().into_iter(),
740        }))
741    }
742}
743
744struct ReaderPageIterator<T: ChunkReader> {
745    reader: Arc<T>,
746    column_idx: usize,
747    row_groups: std::vec::IntoIter<usize>,
748    metadata: Arc<ParquetMetaData>,
749}
750
751impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
752    /// Return the next SerializedPageReader
753    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
754        let rg = self.metadata.row_group(rg_idx);
755        let column_chunk_metadata = rg.column(self.column_idx);
756        let offset_index = self.metadata.offset_index();
757        // `offset_index` may not exist and `i[rg_idx]` will be empty.
758        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
759        let page_locations = offset_index
760            .filter(|i| !i[rg_idx].is_empty())
761            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
762        let total_rows = rg.num_rows() as usize;
763        let reader = self.reader.clone();
764
765        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
766            .add_crypto_context(
767                rg_idx,
768                self.column_idx,
769                self.metadata.as_ref(),
770                column_chunk_metadata,
771            )
772    }
773}
774
775impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
776    type Item = Result<Box<dyn PageReader>>;
777
778    fn next(&mut self) -> Option<Self::Item> {
779        let rg_idx = self.row_groups.next()?;
780        let page_reader = self
781            .next_page_reader(rg_idx)
782            .map(|page_reader| Box::new(page_reader) as _);
783        Some(page_reader)
784    }
785}
786
787impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
788
789/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790/// read from a parquet data source
791pub struct ParquetRecordBatchReader {
792    batch_size: usize,
793    array_reader: Box<dyn ArrayReader>,
794    schema: SchemaRef,
795    /// Row ranges to be selected from the data source
796    selection: Option<VecDeque<RowSelector>>,
797}
798
799impl Iterator for ParquetRecordBatchReader {
800    type Item = Result<RecordBatch, ArrowError>;
801
802    fn next(&mut self) -> Option<Self::Item> {
803        self.next_inner()
804            .map_err(|arrow_err| arrow_err.into())
805            .transpose()
806    }
807}
808
809impl ParquetRecordBatchReader {
810    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
811    /// has reached the end of the file.
812    ///
813    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
814    /// simplify error handling with `?`
815    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
816        let mut read_records = 0;
817        match self.selection.as_mut() {
818            Some(selection) => {
819                while read_records < self.batch_size && !selection.is_empty() {
820                    let front = selection.pop_front().unwrap();
821                    if front.skip {
822                        let skipped = self.array_reader.skip_records(front.row_count)?;
823
824                        if skipped != front.row_count {
825                            return Err(general_err!(
826                                "failed to skip rows, expected {}, got {}",
827                                front.row_count,
828                                skipped
829                            ));
830                        }
831                        continue;
832                    }
833
834                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
835                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
836                    if front.row_count == 0 {
837                        continue;
838                    }
839
840                    // try to read record
841                    let need_read = self.batch_size - read_records;
842                    let to_read = match front.row_count.checked_sub(need_read) {
843                        Some(remaining) if remaining != 0 => {
844                            // if page row count less than batch_size we must set batch size to page row count.
845                            // add check avoid dead loop
846                            selection.push_front(RowSelector::select(remaining));
847                            need_read
848                        }
849                        _ => front.row_count,
850                    };
851                    match self.array_reader.read_records(to_read)? {
852                        0 => break,
853                        rec => read_records += rec,
854                    };
855                }
856            }
857            None => {
858                self.array_reader.read_records(self.batch_size)?;
859            }
860        };
861
862        let array = self.array_reader.consume_batch()?;
863        let struct_array = array.as_struct_opt().ok_or_else(|| {
864            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
865        })?;
866
867        Ok(if struct_array.len() > 0 {
868            Some(RecordBatch::from(struct_array))
869        } else {
870            None
871        })
872    }
873}
874
875impl RecordBatchReader for ParquetRecordBatchReader {
876    /// Returns the projected [`SchemaRef`] for reading the parquet file.
877    ///
878    /// Note that the schema metadata will be stripped here. See
879    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
880    fn schema(&self) -> SchemaRef {
881        self.schema.clone()
882    }
883}
884
885impl ParquetRecordBatchReader {
886    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
887    ///
888    /// See [`ParquetRecordBatchReaderBuilder`] for more options
889    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
890        ParquetRecordBatchReaderBuilder::try_new(reader)?
891            .with_batch_size(batch_size)
892            .build()
893    }
894
895    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
896    ///
897    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
898    /// higher-level interface for reading parquet data from a file
899    pub fn try_new_with_row_groups(
900        levels: &FieldLevels,
901        row_groups: &dyn RowGroups,
902        batch_size: usize,
903        selection: Option<RowSelection>,
904    ) -> Result<Self> {
905        let array_reader =
906            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
907
908        Ok(Self {
909            batch_size,
910            array_reader,
911            schema: Arc::new(Schema::new(levels.fields.clone())),
912            selection: selection.map(|s| s.trim().into()),
913        })
914    }
915
916    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
917    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
918    /// all rows will be returned
919    pub(crate) fn new(
920        batch_size: usize,
921        array_reader: Box<dyn ArrayReader>,
922        selection: Option<RowSelection>,
923    ) -> Self {
924        let schema = match array_reader.get_data_type() {
925            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
926            _ => unreachable!("Struct array reader's data type is not struct!"),
927        };
928
929        Self {
930            batch_size,
931            array_reader,
932            schema: Arc::new(schema),
933            selection: selection.map(|s| s.trim().into()),
934        }
935    }
936}
937
938/// Returns `true` if `selection` is `None` or selects some rows
939pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
940    selection.map(|x| x.selects_any()).unwrap_or(true)
941}
942
943/// Applies an optional offset and limit to an optional [`RowSelection`]
944pub(crate) fn apply_range(
945    mut selection: Option<RowSelection>,
946    row_count: usize,
947    offset: Option<usize>,
948    limit: Option<usize>,
949) -> Option<RowSelection> {
950    // If an offset is defined, apply it to the `selection`
951    if let Some(offset) = offset {
952        selection = Some(match row_count.checked_sub(offset) {
953            None => RowSelection::from(vec![]),
954            Some(remaining) => selection
955                .map(|selection| selection.offset(offset))
956                .unwrap_or_else(|| {
957                    RowSelection::from(vec![
958                        RowSelector::skip(offset),
959                        RowSelector::select(remaining),
960                    ])
961                }),
962        });
963    }
964
965    // If a limit is defined, apply it to the final `selection`
966    if let Some(limit) = limit {
967        selection = Some(
968            selection
969                .map(|selection| selection.limit(limit))
970                .unwrap_or_else(|| {
971                    RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
972                }),
973        );
974    }
975    selection
976}
977
978/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
979/// which rows to return.
980///
981/// `input_selection`: Optional pre-existing selection. If `Some`, then the
982/// final [`RowSelection`] will be the conjunction of it and the rows selected
983/// by `predicate`.
984///
985/// Note: A pre-existing selection may come from evaluating a previous predicate
986/// or if the [`ParquetRecordBatchReader`] specified an explicit
987/// [`RowSelection`] in addition to one or more predicates.
988pub(crate) fn evaluate_predicate(
989    batch_size: usize,
990    array_reader: Box<dyn ArrayReader>,
991    input_selection: Option<RowSelection>,
992    predicate: &mut dyn ArrowPredicate,
993) -> Result<RowSelection> {
994    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
995    let mut filters = vec![];
996    for maybe_batch in reader {
997        let maybe_batch = maybe_batch?;
998        let input_rows = maybe_batch.num_rows();
999        let filter = predicate.evaluate(maybe_batch)?;
1000        // Since user supplied predicate, check error here to catch bugs quickly
1001        if filter.len() != input_rows {
1002            return Err(arrow_err!(
1003                "ArrowPredicate predicate returned {} rows, expected {input_rows}",
1004                filter.len()
1005            ));
1006        }
1007        match filter.null_count() {
1008            0 => filters.push(filter),
1009            _ => filters.push(prep_null_mask_filter(&filter)),
1010        };
1011    }
1012
1013    let raw = RowSelection::from_filters(&filters);
1014    Ok(match input_selection {
1015        Some(selection) => selection.and_then(&raw),
1016        None => raw,
1017    })
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use std::cmp::min;
1023    use std::collections::{HashMap, VecDeque};
1024    use std::fmt::Formatter;
1025    use std::fs::File;
1026    use std::io::Seek;
1027    use std::path::PathBuf;
1028    use std::sync::Arc;
1029
1030    use arrow_array::builder::*;
1031    use arrow_array::cast::AsArray;
1032    use arrow_array::types::{
1033        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
1034        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
1035    };
1036    use arrow_array::*;
1037    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1038    use arrow_data::{ArrayData, ArrayDataBuilder};
1039    use arrow_schema::{
1040        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1041    };
1042    use arrow_select::concat::concat_batches;
1043    use bytes::Bytes;
1044    use half::f16;
1045    use num::PrimInt;
1046    use rand::{rng, Rng, RngCore};
1047    use tempfile::tempfile;
1048
1049    use crate::arrow::arrow_reader::{
1050        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1051        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1052    };
1053    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1054    use crate::arrow::{ArrowWriter, ProjectionMask};
1055    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1056    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1057    use crate::data_type::{
1058        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1059        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1060    };
1061    use crate::errors::Result;
1062    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1063    use crate::file::writer::SerializedFileWriter;
1064    use crate::schema::parser::parse_message_type;
1065    use crate::schema::types::{Type, TypePtr};
1066    use crate::util::test_common::rand_gen::RandGen;
1067
1068    #[test]
1069    fn test_arrow_reader_all_columns() {
1070        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1071
1072        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1073        let original_schema = Arc::clone(builder.schema());
1074        let reader = builder.build().unwrap();
1075
1076        // Verify that the schema was correctly parsed
1077        assert_eq!(original_schema.fields(), reader.schema().fields());
1078    }
1079
1080    #[test]
1081    fn test_arrow_reader_single_column() {
1082        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1083
1084        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1085        let original_schema = Arc::clone(builder.schema());
1086
1087        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1088        let reader = builder.with_projection(mask).build().unwrap();
1089
1090        // Verify that the schema was correctly parsed
1091        assert_eq!(1, reader.schema().fields().len());
1092        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1093    }
1094
1095    #[test]
1096    fn test_arrow_reader_single_column_by_name() {
1097        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1098
1099        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1100        let original_schema = Arc::clone(builder.schema());
1101
1102        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1103        let reader = builder.with_projection(mask).build().unwrap();
1104
1105        // Verify that the schema was correctly parsed
1106        assert_eq!(1, reader.schema().fields().len());
1107        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1108    }
1109
1110    #[test]
1111    fn test_null_column_reader_test() {
1112        let mut file = tempfile::tempfile().unwrap();
1113
1114        let schema = "
1115            message message {
1116                OPTIONAL INT32 int32;
1117            }
1118        ";
1119        let schema = Arc::new(parse_message_type(schema).unwrap());
1120
1121        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1122        generate_single_column_file_with_data::<Int32Type>(
1123            &[vec![], vec![]],
1124            Some(&def_levels),
1125            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1126            schema,
1127            Some(Field::new("int32", ArrowDataType::Null, true)),
1128            &Default::default(),
1129        )
1130        .unwrap();
1131
1132        file.rewind().unwrap();
1133
1134        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1135        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1136
1137        assert_eq!(batches.len(), 4);
1138        for batch in &batches[0..3] {
1139            assert_eq!(batch.num_rows(), 2);
1140            assert_eq!(batch.num_columns(), 1);
1141            assert_eq!(batch.column(0).null_count(), 2);
1142        }
1143
1144        assert_eq!(batches[3].num_rows(), 1);
1145        assert_eq!(batches[3].num_columns(), 1);
1146        assert_eq!(batches[3].column(0).null_count(), 1);
1147    }
1148
1149    #[test]
1150    fn test_primitive_single_column_reader_test() {
1151        run_single_column_reader_tests::<BoolType, _, BoolType>(
1152            2,
1153            ConvertedType::NONE,
1154            None,
1155            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1156            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1157        );
1158        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1159            2,
1160            ConvertedType::NONE,
1161            None,
1162            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1163            &[
1164                Encoding::PLAIN,
1165                Encoding::RLE_DICTIONARY,
1166                Encoding::DELTA_BINARY_PACKED,
1167                Encoding::BYTE_STREAM_SPLIT,
1168            ],
1169        );
1170        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1171            2,
1172            ConvertedType::NONE,
1173            None,
1174            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1175            &[
1176                Encoding::PLAIN,
1177                Encoding::RLE_DICTIONARY,
1178                Encoding::DELTA_BINARY_PACKED,
1179                Encoding::BYTE_STREAM_SPLIT,
1180            ],
1181        );
1182        run_single_column_reader_tests::<FloatType, _, FloatType>(
1183            2,
1184            ConvertedType::NONE,
1185            None,
1186            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1187            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1188        );
1189    }
1190
1191    #[test]
1192    fn test_unsigned_primitive_single_column_reader_test() {
1193        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1194            2,
1195            ConvertedType::UINT_32,
1196            Some(ArrowDataType::UInt32),
1197            |vals| {
1198                Arc::new(UInt32Array::from_iter(
1199                    vals.iter().map(|x| x.map(|x| x as u32)),
1200                ))
1201            },
1202            &[
1203                Encoding::PLAIN,
1204                Encoding::RLE_DICTIONARY,
1205                Encoding::DELTA_BINARY_PACKED,
1206            ],
1207        );
1208        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1209            2,
1210            ConvertedType::UINT_64,
1211            Some(ArrowDataType::UInt64),
1212            |vals| {
1213                Arc::new(UInt64Array::from_iter(
1214                    vals.iter().map(|x| x.map(|x| x as u64)),
1215                ))
1216            },
1217            &[
1218                Encoding::PLAIN,
1219                Encoding::RLE_DICTIONARY,
1220                Encoding::DELTA_BINARY_PACKED,
1221            ],
1222        );
1223    }
1224
1225    #[test]
1226    fn test_unsigned_roundtrip() {
1227        let schema = Arc::new(Schema::new(vec![
1228            Field::new("uint32", ArrowDataType::UInt32, true),
1229            Field::new("uint64", ArrowDataType::UInt64, true),
1230        ]));
1231
1232        let mut buf = Vec::with_capacity(1024);
1233        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1234
1235        let original = RecordBatch::try_new(
1236            schema,
1237            vec![
1238                Arc::new(UInt32Array::from_iter_values([
1239                    0,
1240                    i32::MAX as u32,
1241                    u32::MAX,
1242                ])),
1243                Arc::new(UInt64Array::from_iter_values([
1244                    0,
1245                    i64::MAX as u64,
1246                    u64::MAX,
1247                ])),
1248            ],
1249        )
1250        .unwrap();
1251
1252        writer.write(&original).unwrap();
1253        writer.close().unwrap();
1254
1255        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1256        let ret = reader.next().unwrap().unwrap();
1257        assert_eq!(ret, original);
1258
1259        // Check they can be downcast to the correct type
1260        ret.column(0)
1261            .as_any()
1262            .downcast_ref::<UInt32Array>()
1263            .unwrap();
1264
1265        ret.column(1)
1266            .as_any()
1267            .downcast_ref::<UInt64Array>()
1268            .unwrap();
1269    }
1270
1271    #[test]
1272    fn test_float16_roundtrip() -> Result<()> {
1273        let schema = Arc::new(Schema::new(vec![
1274            Field::new("float16", ArrowDataType::Float16, false),
1275            Field::new("float16-nullable", ArrowDataType::Float16, true),
1276        ]));
1277
1278        let mut buf = Vec::with_capacity(1024);
1279        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1280
1281        let original = RecordBatch::try_new(
1282            schema,
1283            vec![
1284                Arc::new(Float16Array::from_iter_values([
1285                    f16::EPSILON,
1286                    f16::MIN,
1287                    f16::MAX,
1288                    f16::NAN,
1289                    f16::INFINITY,
1290                    f16::NEG_INFINITY,
1291                    f16::ONE,
1292                    f16::NEG_ONE,
1293                    f16::ZERO,
1294                    f16::NEG_ZERO,
1295                    f16::E,
1296                    f16::PI,
1297                    f16::FRAC_1_PI,
1298                ])),
1299                Arc::new(Float16Array::from(vec![
1300                    None,
1301                    None,
1302                    None,
1303                    Some(f16::NAN),
1304                    Some(f16::INFINITY),
1305                    Some(f16::NEG_INFINITY),
1306                    None,
1307                    None,
1308                    None,
1309                    None,
1310                    None,
1311                    None,
1312                    Some(f16::FRAC_1_PI),
1313                ])),
1314            ],
1315        )?;
1316
1317        writer.write(&original)?;
1318        writer.close()?;
1319
1320        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1321        let ret = reader.next().unwrap()?;
1322        assert_eq!(ret, original);
1323
1324        // Ensure can be downcast to the correct type
1325        ret.column(0).as_primitive::<Float16Type>();
1326        ret.column(1).as_primitive::<Float16Type>();
1327
1328        Ok(())
1329    }
1330
1331    #[test]
1332    fn test_time_utc_roundtrip() -> Result<()> {
1333        let schema = Arc::new(Schema::new(vec![
1334            Field::new(
1335                "time_millis",
1336                ArrowDataType::Time32(TimeUnit::Millisecond),
1337                true,
1338            )
1339            .with_metadata(HashMap::from_iter(vec![(
1340                "adjusted_to_utc".to_string(),
1341                "".to_string(),
1342            )])),
1343            Field::new(
1344                "time_micros",
1345                ArrowDataType::Time64(TimeUnit::Microsecond),
1346                true,
1347            )
1348            .with_metadata(HashMap::from_iter(vec![(
1349                "adjusted_to_utc".to_string(),
1350                "".to_string(),
1351            )])),
1352        ]));
1353
1354        let mut buf = Vec::with_capacity(1024);
1355        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1356
1357        let original = RecordBatch::try_new(
1358            schema,
1359            vec![
1360                Arc::new(Time32MillisecondArray::from(vec![
1361                    Some(-1),
1362                    Some(0),
1363                    Some(86_399_000),
1364                    Some(86_400_000),
1365                    Some(86_401_000),
1366                    None,
1367                ])),
1368                Arc::new(Time64MicrosecondArray::from(vec![
1369                    Some(-1),
1370                    Some(0),
1371                    Some(86_399 * 1_000_000),
1372                    Some(86_400 * 1_000_000),
1373                    Some(86_401 * 1_000_000),
1374                    None,
1375                ])),
1376            ],
1377        )?;
1378
1379        writer.write(&original)?;
1380        writer.close()?;
1381
1382        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1383        let ret = reader.next().unwrap()?;
1384        assert_eq!(ret, original);
1385
1386        // Ensure can be downcast to the correct type
1387        ret.column(0).as_primitive::<Time32MillisecondType>();
1388        ret.column(1).as_primitive::<Time64MicrosecondType>();
1389
1390        Ok(())
1391    }
1392
1393    #[test]
1394    fn test_date32_roundtrip() -> Result<()> {
1395        use arrow_array::Date32Array;
1396
1397        let schema = Arc::new(Schema::new(vec![Field::new(
1398            "date32",
1399            ArrowDataType::Date32,
1400            false,
1401        )]));
1402
1403        let mut buf = Vec::with_capacity(1024);
1404
1405        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1406
1407        let original = RecordBatch::try_new(
1408            schema,
1409            vec![Arc::new(Date32Array::from(vec![
1410                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1411            ]))],
1412        )?;
1413
1414        writer.write(&original)?;
1415        writer.close()?;
1416
1417        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1418        let ret = reader.next().unwrap()?;
1419        assert_eq!(ret, original);
1420
1421        // Ensure can be downcast to the correct type
1422        ret.column(0).as_primitive::<Date32Type>();
1423
1424        Ok(())
1425    }
1426
1427    #[test]
1428    fn test_date64_roundtrip() -> Result<()> {
1429        use arrow_array::Date64Array;
1430
1431        let schema = Arc::new(Schema::new(vec![
1432            Field::new("small-date64", ArrowDataType::Date64, false),
1433            Field::new("big-date64", ArrowDataType::Date64, false),
1434            Field::new("invalid-date64", ArrowDataType::Date64, false),
1435        ]));
1436
1437        let mut default_buf = Vec::with_capacity(1024);
1438        let mut coerce_buf = Vec::with_capacity(1024);
1439
1440        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1441
1442        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1443        let mut coerce_writer =
1444            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1445
1446        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1447
1448        let original = RecordBatch::try_new(
1449            schema,
1450            vec![
1451                // small-date64
1452                Arc::new(Date64Array::from(vec![
1453                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1454                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1455                    0,
1456                    1_000 * NUM_MILLISECONDS_IN_DAY,
1457                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1458                ])),
1459                // big-date64
1460                Arc::new(Date64Array::from(vec![
1461                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1462                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1463                    0,
1464                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1465                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1466                ])),
1467                // invalid-date64
1468                Arc::new(Date64Array::from(vec![
1469                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1470                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1471                    1,
1472                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1473                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1474                ])),
1475            ],
1476        )?;
1477
1478        default_writer.write(&original)?;
1479        coerce_writer.write(&original)?;
1480
1481        default_writer.close()?;
1482        coerce_writer.close()?;
1483
1484        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1485        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1486
1487        let default_ret = default_reader.next().unwrap()?;
1488        let coerce_ret = coerce_reader.next().unwrap()?;
1489
1490        // Roundtrip should be successful when default writer used
1491        assert_eq!(default_ret, original);
1492
1493        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1494        assert_eq!(coerce_ret.column(0), original.column(0));
1495        assert_ne!(coerce_ret.column(1), original.column(1));
1496        assert_ne!(coerce_ret.column(2), original.column(2));
1497
1498        // Ensure both can be downcast to the correct type
1499        default_ret.column(0).as_primitive::<Date64Type>();
1500        coerce_ret.column(0).as_primitive::<Date64Type>();
1501
1502        Ok(())
1503    }
1504    struct RandFixedLenGen {}
1505
1506    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1507        fn gen(len: i32) -> FixedLenByteArray {
1508            let mut v = vec![0u8; len as usize];
1509            rng().fill_bytes(&mut v);
1510            ByteArray::from(v).into()
1511        }
1512    }
1513
1514    #[test]
1515    fn test_fixed_length_binary_column_reader() {
1516        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1517            20,
1518            ConvertedType::NONE,
1519            None,
1520            |vals| {
1521                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1522                for val in vals {
1523                    match val {
1524                        Some(b) => builder.append_value(b).unwrap(),
1525                        None => builder.append_null(),
1526                    }
1527                }
1528                Arc::new(builder.finish())
1529            },
1530            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1531        );
1532    }
1533
1534    #[test]
1535    fn test_interval_day_time_column_reader() {
1536        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1537            12,
1538            ConvertedType::INTERVAL,
1539            None,
1540            |vals| {
1541                Arc::new(
1542                    vals.iter()
1543                        .map(|x| {
1544                            x.as_ref().map(|b| IntervalDayTime {
1545                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1546                                milliseconds: i32::from_le_bytes(
1547                                    b.as_ref()[8..12].try_into().unwrap(),
1548                                ),
1549                            })
1550                        })
1551                        .collect::<IntervalDayTimeArray>(),
1552                )
1553            },
1554            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1555        );
1556    }
1557
1558    #[test]
1559    fn test_int96_single_column_reader_test() {
1560        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1561
1562        type TypeHintAndConversionFunction =
1563            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1564
1565        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1566            // Test without a specified ArrowType hint.
1567            (None, |vals: &[Option<Int96>]| {
1568                Arc::new(TimestampNanosecondArray::from_iter(
1569                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1570                )) as ArrayRef
1571            }),
1572            // Test other TimeUnits as ArrowType hints.
1573            (
1574                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1575                |vals: &[Option<Int96>]| {
1576                    Arc::new(TimestampSecondArray::from_iter(
1577                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1578                    )) as ArrayRef
1579                },
1580            ),
1581            (
1582                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1583                |vals: &[Option<Int96>]| {
1584                    Arc::new(TimestampMillisecondArray::from_iter(
1585                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1586                    )) as ArrayRef
1587                },
1588            ),
1589            (
1590                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1591                |vals: &[Option<Int96>]| {
1592                    Arc::new(TimestampMicrosecondArray::from_iter(
1593                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1594                    )) as ArrayRef
1595                },
1596            ),
1597            (
1598                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1599                |vals: &[Option<Int96>]| {
1600                    Arc::new(TimestampNanosecondArray::from_iter(
1601                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1602                    )) as ArrayRef
1603                },
1604            ),
1605            // Test another timezone with TimeUnit as ArrowType hints.
1606            (
1607                Some(ArrowDataType::Timestamp(
1608                    TimeUnit::Second,
1609                    Some(Arc::from("-05:00")),
1610                )),
1611                |vals: &[Option<Int96>]| {
1612                    Arc::new(
1613                        TimestampSecondArray::from_iter(
1614                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1615                        )
1616                        .with_timezone("-05:00"),
1617                    ) as ArrayRef
1618                },
1619            ),
1620        ];
1621
1622        resolutions.iter().for_each(|(arrow_type, converter)| {
1623            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1624                2,
1625                ConvertedType::NONE,
1626                arrow_type.clone(),
1627                converter,
1628                encodings,
1629            );
1630        })
1631    }
1632
1633    #[test]
1634    fn test_int96_from_spark_file_with_provided_schema() {
1635        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1636        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1637        // microsecond resolution for the Parquet reader to interpret these values correctly.
1638        use arrow_schema::DataType::Timestamp;
1639        let test_data = arrow::util::test_util::parquet_test_data();
1640        let path = format!("{test_data}/int96_from_spark.parquet");
1641        let file = File::open(path).unwrap();
1642
1643        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1644            "a",
1645            Timestamp(TimeUnit::Microsecond, None),
1646            true,
1647        )]));
1648        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1649
1650        let mut record_reader =
1651            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1652                .unwrap()
1653                .build()
1654                .unwrap();
1655
1656        let batch = record_reader.next().unwrap().unwrap();
1657        assert_eq!(batch.num_columns(), 1);
1658        let column = batch.column(0);
1659        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1660
1661        let expected = Arc::new(Int64Array::from(vec![
1662            Some(1704141296123456),
1663            Some(1704070800000000),
1664            Some(253402225200000000),
1665            Some(1735599600000000),
1666            None,
1667            Some(9089380393200000000),
1668        ]));
1669
1670        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1671        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1672        // anyway, so this should be a zero-copy non-modifying cast.
1673
1674        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1675        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1676
1677        assert_eq!(casted_timestamps.len(), expected.len());
1678
1679        casted_timestamps
1680            .iter()
1681            .zip(expected.iter())
1682            .for_each(|(lhs, rhs)| {
1683                assert_eq!(lhs, rhs);
1684            });
1685    }
1686
1687    #[test]
1688    fn test_int96_from_spark_file_without_provided_schema() {
1689        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1690        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1691        // values when read as nanosecond resolution overflow and result in garbage values.
1692        use arrow_schema::DataType::Timestamp;
1693        let test_data = arrow::util::test_util::parquet_test_data();
1694        let path = format!("{test_data}/int96_from_spark.parquet");
1695        let file = File::open(path).unwrap();
1696
1697        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1698            .unwrap()
1699            .build()
1700            .unwrap();
1701
1702        let batch = record_reader.next().unwrap().unwrap();
1703        assert_eq!(batch.num_columns(), 1);
1704        let column = batch.column(0);
1705        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1706
1707        let expected = Arc::new(Int64Array::from(vec![
1708            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1709            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1710            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1711            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1712            None,
1713            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1714        ]));
1715
1716        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1717        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1718        // anyway, so this should be a zero-copy non-modifying cast.
1719
1720        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1721        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1722
1723        assert_eq!(casted_timestamps.len(), expected.len());
1724
1725        casted_timestamps
1726            .iter()
1727            .zip(expected.iter())
1728            .for_each(|(lhs, rhs)| {
1729                assert_eq!(lhs, rhs);
1730            });
1731    }
1732
1733    struct RandUtf8Gen {}
1734
1735    impl RandGen<ByteArrayType> for RandUtf8Gen {
1736        fn gen(len: i32) -> ByteArray {
1737            Int32Type::gen(len).to_string().as_str().into()
1738        }
1739    }
1740
1741    #[test]
1742    fn test_utf8_single_column_reader_test() {
1743        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1744            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1745                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1746            })))
1747        }
1748
1749        let encodings = &[
1750            Encoding::PLAIN,
1751            Encoding::RLE_DICTIONARY,
1752            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1753            Encoding::DELTA_BYTE_ARRAY,
1754        ];
1755
1756        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1757            2,
1758            ConvertedType::NONE,
1759            None,
1760            |vals| {
1761                Arc::new(BinaryArray::from_iter(
1762                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1763                ))
1764            },
1765            encodings,
1766        );
1767
1768        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1769            2,
1770            ConvertedType::UTF8,
1771            None,
1772            string_converter::<i32>,
1773            encodings,
1774        );
1775
1776        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1777            2,
1778            ConvertedType::UTF8,
1779            Some(ArrowDataType::Utf8),
1780            string_converter::<i32>,
1781            encodings,
1782        );
1783
1784        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1785            2,
1786            ConvertedType::UTF8,
1787            Some(ArrowDataType::LargeUtf8),
1788            string_converter::<i64>,
1789            encodings,
1790        );
1791
1792        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1793        for key in &small_key_types {
1794            for encoding in encodings {
1795                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1796                opts.encoding = *encoding;
1797
1798                let data_type =
1799                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1800
1801                // Cannot run full test suite as keys overflow, run small test instead
1802                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1803                    opts,
1804                    2,
1805                    ConvertedType::UTF8,
1806                    Some(data_type.clone()),
1807                    move |vals| {
1808                        let vals = string_converter::<i32>(vals);
1809                        arrow::compute::cast(&vals, &data_type).unwrap()
1810                    },
1811                );
1812            }
1813        }
1814
1815        let key_types = [
1816            ArrowDataType::Int16,
1817            ArrowDataType::UInt16,
1818            ArrowDataType::Int32,
1819            ArrowDataType::UInt32,
1820            ArrowDataType::Int64,
1821            ArrowDataType::UInt64,
1822        ];
1823
1824        for key in &key_types {
1825            let data_type =
1826                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1827
1828            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1829                2,
1830                ConvertedType::UTF8,
1831                Some(data_type.clone()),
1832                move |vals| {
1833                    let vals = string_converter::<i32>(vals);
1834                    arrow::compute::cast(&vals, &data_type).unwrap()
1835                },
1836                encodings,
1837            );
1838
1839            // https://github.com/apache/arrow-rs/issues/1179
1840            // let data_type = ArrowDataType::Dictionary(
1841            //     Box::new(key.clone()),
1842            //     Box::new(ArrowDataType::LargeUtf8),
1843            // );
1844            //
1845            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1846            //     2,
1847            //     ConvertedType::UTF8,
1848            //     Some(data_type.clone()),
1849            //     move |vals| {
1850            //         let vals = string_converter::<i64>(vals);
1851            //         arrow::compute::cast(&vals, &data_type).unwrap()
1852            //     },
1853            //     encodings,
1854            // );
1855        }
1856    }
1857
1858    #[test]
1859    fn test_decimal_nullable_struct() {
1860        let decimals = Decimal256Array::from_iter_values(
1861            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1862        );
1863
1864        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1865            "decimals",
1866            decimals.data_type().clone(),
1867            false,
1868        )])))
1869        .len(8)
1870        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1871        .child_data(vec![decimals.into_data()])
1872        .build()
1873        .unwrap();
1874
1875        let written =
1876            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1877                .unwrap();
1878
1879        let mut buffer = Vec::with_capacity(1024);
1880        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1881        writer.write(&written).unwrap();
1882        writer.close().unwrap();
1883
1884        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1885            .unwrap()
1886            .collect::<Result<Vec<_>, _>>()
1887            .unwrap();
1888
1889        assert_eq!(&written.slice(0, 3), &read[0]);
1890        assert_eq!(&written.slice(3, 3), &read[1]);
1891        assert_eq!(&written.slice(6, 2), &read[2]);
1892    }
1893
1894    #[test]
1895    fn test_int32_nullable_struct() {
1896        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1897        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1898            "int32",
1899            int32.data_type().clone(),
1900            false,
1901        )])))
1902        .len(8)
1903        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1904        .child_data(vec![int32.into_data()])
1905        .build()
1906        .unwrap();
1907
1908        let written =
1909            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1910                .unwrap();
1911
1912        let mut buffer = Vec::with_capacity(1024);
1913        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1914        writer.write(&written).unwrap();
1915        writer.close().unwrap();
1916
1917        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1918            .unwrap()
1919            .collect::<Result<Vec<_>, _>>()
1920            .unwrap();
1921
1922        assert_eq!(&written.slice(0, 3), &read[0]);
1923        assert_eq!(&written.slice(3, 3), &read[1]);
1924        assert_eq!(&written.slice(6, 2), &read[2]);
1925    }
1926
1927    #[test]
1928    fn test_decimal_list() {
1929        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1930
1931        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1932        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1933            decimals.data_type().clone(),
1934            false,
1935        ))))
1936        .len(7)
1937        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1938        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1939        .child_data(vec![decimals.into_data()])
1940        .build()
1941        .unwrap();
1942
1943        let written =
1944            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1945                .unwrap();
1946
1947        let mut buffer = Vec::with_capacity(1024);
1948        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1949        writer.write(&written).unwrap();
1950        writer.close().unwrap();
1951
1952        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1953            .unwrap()
1954            .collect::<Result<Vec<_>, _>>()
1955            .unwrap();
1956
1957        assert_eq!(&written.slice(0, 3), &read[0]);
1958        assert_eq!(&written.slice(3, 3), &read[1]);
1959        assert_eq!(&written.slice(6, 1), &read[2]);
1960    }
1961
1962    #[test]
1963    fn test_read_decimal_file() {
1964        use arrow_array::Decimal128Array;
1965        let testdata = arrow::util::test_util::parquet_test_data();
1966        let file_variants = vec![
1967            ("byte_array", 4),
1968            ("fixed_length", 25),
1969            ("int32", 4),
1970            ("int64", 10),
1971        ];
1972        for (prefix, target_precision) in file_variants {
1973            let path = format!("{testdata}/{prefix}_decimal.parquet");
1974            let file = File::open(path).unwrap();
1975            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1976
1977            let batch = record_reader.next().unwrap().unwrap();
1978            assert_eq!(batch.num_rows(), 24);
1979            let col = batch
1980                .column(0)
1981                .as_any()
1982                .downcast_ref::<Decimal128Array>()
1983                .unwrap();
1984
1985            let expected = 1..25;
1986
1987            assert_eq!(col.precision(), target_precision);
1988            assert_eq!(col.scale(), 2);
1989
1990            for (i, v) in expected.enumerate() {
1991                assert_eq!(col.value(i), v * 100_i128);
1992            }
1993        }
1994    }
1995
1996    #[test]
1997    fn test_read_float16_nonzeros_file() {
1998        use arrow_array::Float16Array;
1999        let testdata = arrow::util::test_util::parquet_test_data();
2000        // see https://github.com/apache/parquet-testing/pull/40
2001        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2002        let file = File::open(path).unwrap();
2003        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2004
2005        let batch = record_reader.next().unwrap().unwrap();
2006        assert_eq!(batch.num_rows(), 8);
2007        let col = batch
2008            .column(0)
2009            .as_any()
2010            .downcast_ref::<Float16Array>()
2011            .unwrap();
2012
2013        let f16_two = f16::ONE + f16::ONE;
2014
2015        assert_eq!(col.null_count(), 1);
2016        assert!(col.is_null(0));
2017        assert_eq!(col.value(1), f16::ONE);
2018        assert_eq!(col.value(2), -f16_two);
2019        assert!(col.value(3).is_nan());
2020        assert_eq!(col.value(4), f16::ZERO);
2021        assert!(col.value(4).is_sign_positive());
2022        assert_eq!(col.value(5), f16::NEG_ONE);
2023        assert_eq!(col.value(6), f16::NEG_ZERO);
2024        assert!(col.value(6).is_sign_negative());
2025        assert_eq!(col.value(7), f16_two);
2026    }
2027
2028    #[test]
2029    fn test_read_float16_zeros_file() {
2030        use arrow_array::Float16Array;
2031        let testdata = arrow::util::test_util::parquet_test_data();
2032        // see https://github.com/apache/parquet-testing/pull/40
2033        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2034        let file = File::open(path).unwrap();
2035        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2036
2037        let batch = record_reader.next().unwrap().unwrap();
2038        assert_eq!(batch.num_rows(), 3);
2039        let col = batch
2040            .column(0)
2041            .as_any()
2042            .downcast_ref::<Float16Array>()
2043            .unwrap();
2044
2045        assert_eq!(col.null_count(), 1);
2046        assert!(col.is_null(0));
2047        assert_eq!(col.value(1), f16::ZERO);
2048        assert!(col.value(1).is_sign_positive());
2049        assert!(col.value(2).is_nan());
2050    }
2051
2052    #[test]
2053    fn test_read_float32_float64_byte_stream_split() {
2054        let path = format!(
2055            "{}/byte_stream_split.zstd.parquet",
2056            arrow::util::test_util::parquet_test_data(),
2057        );
2058        let file = File::open(path).unwrap();
2059        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2060
2061        let mut row_count = 0;
2062        for batch in record_reader {
2063            let batch = batch.unwrap();
2064            row_count += batch.num_rows();
2065            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2066            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2067
2068            // This file contains floats from a standard normal distribution
2069            for &x in f32_col.values() {
2070                assert!(x > -10.0);
2071                assert!(x < 10.0);
2072            }
2073            for &x in f64_col.values() {
2074                assert!(x > -10.0);
2075                assert!(x < 10.0);
2076            }
2077        }
2078        assert_eq!(row_count, 300);
2079    }
2080
2081    #[test]
2082    fn test_read_extended_byte_stream_split() {
2083        let path = format!(
2084            "{}/byte_stream_split_extended.gzip.parquet",
2085            arrow::util::test_util::parquet_test_data(),
2086        );
2087        let file = File::open(path).unwrap();
2088        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2089
2090        let mut row_count = 0;
2091        for batch in record_reader {
2092            let batch = batch.unwrap();
2093            row_count += batch.num_rows();
2094
2095            // 0,1 are f16
2096            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2097            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2098            assert_eq!(f16_col.len(), f16_bss.len());
2099            f16_col
2100                .iter()
2101                .zip(f16_bss.iter())
2102                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2103
2104            // 2,3 are f32
2105            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2106            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2107            assert_eq!(f32_col.len(), f32_bss.len());
2108            f32_col
2109                .iter()
2110                .zip(f32_bss.iter())
2111                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2112
2113            // 4,5 are f64
2114            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2115            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2116            assert_eq!(f64_col.len(), f64_bss.len());
2117            f64_col
2118                .iter()
2119                .zip(f64_bss.iter())
2120                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2121
2122            // 6,7 are i32
2123            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2124            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2125            assert_eq!(i32_col.len(), i32_bss.len());
2126            i32_col
2127                .iter()
2128                .zip(i32_bss.iter())
2129                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2130
2131            // 8,9 are i64
2132            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2133            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2134            assert_eq!(i64_col.len(), i64_bss.len());
2135            i64_col
2136                .iter()
2137                .zip(i64_bss.iter())
2138                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2139
2140            // 10,11 are FLBA(5)
2141            let flba_col = batch.column(10).as_fixed_size_binary();
2142            let flba_bss = batch.column(11).as_fixed_size_binary();
2143            assert_eq!(flba_col.len(), flba_bss.len());
2144            flba_col
2145                .iter()
2146                .zip(flba_bss.iter())
2147                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2148
2149            // 12,13 are FLBA(4) (decimal(7,3))
2150            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2151            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2152            assert_eq!(dec_col.len(), dec_bss.len());
2153            dec_col
2154                .iter()
2155                .zip(dec_bss.iter())
2156                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2157        }
2158        assert_eq!(row_count, 200);
2159    }
2160
2161    #[test]
2162    fn test_read_incorrect_map_schema_file() {
2163        let testdata = arrow::util::test_util::parquet_test_data();
2164        // see https://github.com/apache/parquet-testing/pull/47
2165        let path = format!("{testdata}/incorrect_map_schema.parquet");
2166        let file = File::open(path).unwrap();
2167        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2168
2169        let batch = record_reader.next().unwrap().unwrap();
2170        assert_eq!(batch.num_rows(), 1);
2171
2172        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2173            "my_map",
2174            ArrowDataType::Map(
2175                Arc::new(Field::new(
2176                    "key_value",
2177                    ArrowDataType::Struct(Fields::from(vec![
2178                        Field::new("key", ArrowDataType::Utf8, false),
2179                        Field::new("value", ArrowDataType::Utf8, true),
2180                    ])),
2181                    false,
2182                )),
2183                false,
2184            ),
2185            true,
2186        )]));
2187        assert_eq!(batch.schema().as_ref(), &expected_schema);
2188
2189        assert_eq!(batch.num_rows(), 1);
2190        assert_eq!(batch.column(0).null_count(), 0);
2191        assert_eq!(
2192            batch.column(0).as_map().keys().as_ref(),
2193            &StringArray::from(vec!["parent", "name"])
2194        );
2195        assert_eq!(
2196            batch.column(0).as_map().values().as_ref(),
2197            &StringArray::from(vec!["another", "report"])
2198        );
2199    }
2200
2201    /// Parameters for single_column_reader_test
2202    #[derive(Clone)]
2203    struct TestOptions {
2204        /// Number of row group to write to parquet (row group size =
2205        /// num_row_groups / num_rows)
2206        num_row_groups: usize,
2207        /// Total number of rows per row group
2208        num_rows: usize,
2209        /// Size of batches to read back
2210        record_batch_size: usize,
2211        /// Percentage of nulls in column or None if required
2212        null_percent: Option<usize>,
2213        /// Set write batch size
2214        ///
2215        /// This is the number of rows that are written at once to a page and
2216        /// therefore acts as a bound on the page granularity of a row group
2217        write_batch_size: usize,
2218        /// Maximum size of page in bytes
2219        max_data_page_size: usize,
2220        /// Maximum size of dictionary page in bytes
2221        max_dict_page_size: usize,
2222        /// Writer version
2223        writer_version: WriterVersion,
2224        /// Enabled statistics
2225        enabled_statistics: EnabledStatistics,
2226        /// Encoding
2227        encoding: Encoding,
2228        /// row selections and total selected row count
2229        row_selections: Option<(RowSelection, usize)>,
2230        /// row filter
2231        row_filter: Option<Vec<bool>>,
2232        /// limit
2233        limit: Option<usize>,
2234        /// offset
2235        offset: Option<usize>,
2236    }
2237
2238    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2239    impl std::fmt::Debug for TestOptions {
2240        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2241            f.debug_struct("TestOptions")
2242                .field("num_row_groups", &self.num_row_groups)
2243                .field("num_rows", &self.num_rows)
2244                .field("record_batch_size", &self.record_batch_size)
2245                .field("null_percent", &self.null_percent)
2246                .field("write_batch_size", &self.write_batch_size)
2247                .field("max_data_page_size", &self.max_data_page_size)
2248                .field("max_dict_page_size", &self.max_dict_page_size)
2249                .field("writer_version", &self.writer_version)
2250                .field("enabled_statistics", &self.enabled_statistics)
2251                .field("encoding", &self.encoding)
2252                .field("row_selections", &self.row_selections.is_some())
2253                .field("row_filter", &self.row_filter.is_some())
2254                .field("limit", &self.limit)
2255                .field("offset", &self.offset)
2256                .finish()
2257        }
2258    }
2259
2260    impl Default for TestOptions {
2261        fn default() -> Self {
2262            Self {
2263                num_row_groups: 2,
2264                num_rows: 100,
2265                record_batch_size: 15,
2266                null_percent: None,
2267                write_batch_size: 64,
2268                max_data_page_size: 1024 * 1024,
2269                max_dict_page_size: 1024 * 1024,
2270                writer_version: WriterVersion::PARQUET_1_0,
2271                enabled_statistics: EnabledStatistics::Page,
2272                encoding: Encoding::PLAIN,
2273                row_selections: None,
2274                row_filter: None,
2275                limit: None,
2276                offset: None,
2277            }
2278        }
2279    }
2280
2281    impl TestOptions {
2282        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2283            Self {
2284                num_row_groups,
2285                num_rows,
2286                record_batch_size,
2287                ..Default::default()
2288            }
2289        }
2290
2291        fn with_null_percent(self, null_percent: usize) -> Self {
2292            Self {
2293                null_percent: Some(null_percent),
2294                ..self
2295            }
2296        }
2297
2298        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2299            Self {
2300                max_data_page_size,
2301                ..self
2302            }
2303        }
2304
2305        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2306            Self {
2307                max_dict_page_size,
2308                ..self
2309            }
2310        }
2311
2312        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2313            Self {
2314                enabled_statistics,
2315                ..self
2316            }
2317        }
2318
2319        fn with_row_selections(self) -> Self {
2320            assert!(self.row_filter.is_none(), "Must set row selection first");
2321
2322            let mut rng = rng();
2323            let step = rng.random_range(self.record_batch_size..self.num_rows);
2324            let row_selections = create_test_selection(
2325                step,
2326                self.num_row_groups * self.num_rows,
2327                rng.random::<bool>(),
2328            );
2329            Self {
2330                row_selections: Some(row_selections),
2331                ..self
2332            }
2333        }
2334
2335        fn with_row_filter(self) -> Self {
2336            let row_count = match &self.row_selections {
2337                Some((_, count)) => *count,
2338                None => self.num_row_groups * self.num_rows,
2339            };
2340
2341            let mut rng = rng();
2342            Self {
2343                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2344                ..self
2345            }
2346        }
2347
2348        fn with_limit(self, limit: usize) -> Self {
2349            Self {
2350                limit: Some(limit),
2351                ..self
2352            }
2353        }
2354
2355        fn with_offset(self, offset: usize) -> Self {
2356            Self {
2357                offset: Some(offset),
2358                ..self
2359            }
2360        }
2361
2362        fn writer_props(&self) -> WriterProperties {
2363            let builder = WriterProperties::builder()
2364                .set_data_page_size_limit(self.max_data_page_size)
2365                .set_write_batch_size(self.write_batch_size)
2366                .set_writer_version(self.writer_version)
2367                .set_statistics_enabled(self.enabled_statistics);
2368
2369            let builder = match self.encoding {
2370                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2371                    .set_dictionary_enabled(true)
2372                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2373                _ => builder
2374                    .set_dictionary_enabled(false)
2375                    .set_encoding(self.encoding),
2376            };
2377
2378            builder.build()
2379        }
2380    }
2381
2382    /// Create a parquet file and then read it using
2383    /// `ParquetFileArrowReader` using a standard set of parameters
2384    /// `opts`.
2385    ///
2386    /// `rand_max` represents the maximum size of value to pass to to
2387    /// value generator
2388    fn run_single_column_reader_tests<T, F, G>(
2389        rand_max: i32,
2390        converted_type: ConvertedType,
2391        arrow_type: Option<ArrowDataType>,
2392        converter: F,
2393        encodings: &[Encoding],
2394    ) where
2395        T: DataType,
2396        G: RandGen<T>,
2397        F: Fn(&[Option<T::T>]) -> ArrayRef,
2398    {
2399        let all_options = vec![
2400            // choose record_batch_batch (15) so batches cross row
2401            // group boundaries (50 rows in 2 row groups) cases.
2402            TestOptions::new(2, 100, 15),
2403            // choose record_batch_batch (5) so batches sometime fall
2404            // on row group boundaries and (25 rows in 3 row groups
2405            // --> row groups of 10, 10, and 5). Tests buffer
2406            // refilling edge cases.
2407            TestOptions::new(3, 25, 5),
2408            // Choose record_batch_size (25) so all batches fall
2409            // exactly on row group boundary (25). Tests buffer
2410            // refilling edge cases.
2411            TestOptions::new(4, 100, 25),
2412            // Set maximum page size so row groups have multiple pages
2413            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2414            // Set small dictionary page size to test dictionary fallback
2415            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2416            // Test optional but with no nulls
2417            TestOptions::new(2, 256, 127).with_null_percent(0),
2418            // Test optional with nulls
2419            TestOptions::new(2, 256, 93).with_null_percent(25),
2420            // Test with limit of 0
2421            TestOptions::new(4, 100, 25).with_limit(0),
2422            // Test with limit of 50
2423            TestOptions::new(4, 100, 25).with_limit(50),
2424            // Test with limit equal to number of rows
2425            TestOptions::new(4, 100, 25).with_limit(10),
2426            // Test with limit larger than number of rows
2427            TestOptions::new(4, 100, 25).with_limit(101),
2428            // Test with limit + offset equal to number of rows
2429            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2430            // Test with limit + offset equal to number of rows
2431            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2432            // Test with limit + offset larger than number of rows
2433            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2434            // Test with no page-level statistics
2435            TestOptions::new(2, 256, 91)
2436                .with_null_percent(25)
2437                .with_enabled_statistics(EnabledStatistics::Chunk),
2438            // Test with no statistics
2439            TestOptions::new(2, 256, 91)
2440                .with_null_percent(25)
2441                .with_enabled_statistics(EnabledStatistics::None),
2442            // Test with all null
2443            TestOptions::new(2, 128, 91)
2444                .with_null_percent(100)
2445                .with_enabled_statistics(EnabledStatistics::None),
2446            // Test skip
2447
2448            // choose record_batch_batch (15) so batches cross row
2449            // group boundaries (50 rows in 2 row groups) cases.
2450            TestOptions::new(2, 100, 15).with_row_selections(),
2451            // choose record_batch_batch (5) so batches sometime fall
2452            // on row group boundaries and (25 rows in 3 row groups
2453            // --> row groups of 10, 10, and 5). Tests buffer
2454            // refilling edge cases.
2455            TestOptions::new(3, 25, 5).with_row_selections(),
2456            // Choose record_batch_size (25) so all batches fall
2457            // exactly on row group boundary (25). Tests buffer
2458            // refilling edge cases.
2459            TestOptions::new(4, 100, 25).with_row_selections(),
2460            // Set maximum page size so row groups have multiple pages
2461            TestOptions::new(3, 256, 73)
2462                .with_max_data_page_size(128)
2463                .with_row_selections(),
2464            // Set small dictionary page size to test dictionary fallback
2465            TestOptions::new(3, 256, 57)
2466                .with_max_dict_page_size(128)
2467                .with_row_selections(),
2468            // Test optional but with no nulls
2469            TestOptions::new(2, 256, 127)
2470                .with_null_percent(0)
2471                .with_row_selections(),
2472            // Test optional with nulls
2473            TestOptions::new(2, 256, 93)
2474                .with_null_percent(25)
2475                .with_row_selections(),
2476            // Test optional with nulls
2477            TestOptions::new(2, 256, 93)
2478                .with_null_percent(25)
2479                .with_row_selections()
2480                .with_limit(10),
2481            // Test optional with nulls
2482            TestOptions::new(2, 256, 93)
2483                .with_null_percent(25)
2484                .with_row_selections()
2485                .with_offset(20)
2486                .with_limit(10),
2487            // Test filter
2488
2489            // Test with row filter
2490            TestOptions::new(4, 100, 25).with_row_filter(),
2491            // Test with row selection and row filter
2492            TestOptions::new(4, 100, 25)
2493                .with_row_selections()
2494                .with_row_filter(),
2495            // Test with nulls and row filter
2496            TestOptions::new(2, 256, 93)
2497                .with_null_percent(25)
2498                .with_max_data_page_size(10)
2499                .with_row_filter(),
2500            // Test with nulls and row filter and small pages
2501            TestOptions::new(2, 256, 93)
2502                .with_null_percent(25)
2503                .with_max_data_page_size(10)
2504                .with_row_selections()
2505                .with_row_filter(),
2506            // Test with row selection and no offset index and small pages
2507            TestOptions::new(2, 256, 93)
2508                .with_enabled_statistics(EnabledStatistics::None)
2509                .with_max_data_page_size(10)
2510                .with_row_selections(),
2511        ];
2512
2513        all_options.into_iter().for_each(|opts| {
2514            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2515                for encoding in encodings {
2516                    let opts = TestOptions {
2517                        writer_version,
2518                        encoding: *encoding,
2519                        ..opts.clone()
2520                    };
2521
2522                    single_column_reader_test::<T, _, G>(
2523                        opts,
2524                        rand_max,
2525                        converted_type,
2526                        arrow_type.clone(),
2527                        &converter,
2528                    )
2529                }
2530            }
2531        });
2532    }
2533
2534    /// Create a parquet file and then read it using
2535    /// `ParquetFileArrowReader` using the parameters described in
2536    /// `opts`.
2537    fn single_column_reader_test<T, F, G>(
2538        opts: TestOptions,
2539        rand_max: i32,
2540        converted_type: ConvertedType,
2541        arrow_type: Option<ArrowDataType>,
2542        converter: F,
2543    ) where
2544        T: DataType,
2545        G: RandGen<T>,
2546        F: Fn(&[Option<T::T>]) -> ArrayRef,
2547    {
2548        // Print out options to facilitate debugging failures on CI
2549        println!(
2550            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2551            T::get_physical_type(), converted_type, arrow_type, opts
2552        );
2553
2554        //according to null_percent generate def_levels
2555        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2556            Some(null_percent) => {
2557                let mut rng = rng();
2558
2559                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2560                    .map(|_| {
2561                        std::iter::from_fn(|| {
2562                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2563                        })
2564                        .take(opts.num_rows)
2565                        .collect()
2566                    })
2567                    .collect();
2568                (Repetition::OPTIONAL, Some(def_levels))
2569            }
2570            None => (Repetition::REQUIRED, None),
2571        };
2572
2573        //generate random table data
2574        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2575            .map(|idx| {
2576                let null_count = match def_levels.as_ref() {
2577                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2578                    None => 0,
2579                };
2580                G::gen_vec(rand_max, opts.num_rows - null_count)
2581            })
2582            .collect();
2583
2584        let len = match T::get_physical_type() {
2585            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2586            crate::basic::Type::INT96 => 12,
2587            _ => -1,
2588        };
2589
2590        let fields = vec![Arc::new(
2591            Type::primitive_type_builder("leaf", T::get_physical_type())
2592                .with_repetition(repetition)
2593                .with_converted_type(converted_type)
2594                .with_length(len)
2595                .build()
2596                .unwrap(),
2597        )];
2598
2599        let schema = Arc::new(
2600            Type::group_type_builder("test_schema")
2601                .with_fields(fields)
2602                .build()
2603                .unwrap(),
2604        );
2605
2606        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2607
2608        let mut file = tempfile::tempfile().unwrap();
2609
2610        generate_single_column_file_with_data::<T>(
2611            &values,
2612            def_levels.as_ref(),
2613            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2614            schema,
2615            arrow_field,
2616            &opts,
2617        )
2618        .unwrap();
2619
2620        file.rewind().unwrap();
2621
2622        let options = ArrowReaderOptions::new()
2623            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2624
2625        let mut builder =
2626            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2627
2628        let expected_data = match opts.row_selections {
2629            Some((selections, row_count)) => {
2630                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2631
2632                let mut skip_data: Vec<Option<T::T>> = vec![];
2633                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2634                for select in dequeue {
2635                    if select.skip {
2636                        without_skip_data.drain(0..select.row_count);
2637                    } else {
2638                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2639                    }
2640                }
2641                builder = builder.with_row_selection(selections);
2642
2643                assert_eq!(skip_data.len(), row_count);
2644                skip_data
2645            }
2646            None => {
2647                //get flatten table data
2648                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2649                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2650                expected_data
2651            }
2652        };
2653
2654        let mut expected_data = match opts.row_filter {
2655            Some(filter) => {
2656                let expected_data = expected_data
2657                    .into_iter()
2658                    .zip(filter.iter())
2659                    .filter_map(|(d, f)| f.then(|| d))
2660                    .collect();
2661
2662                let mut filter_offset = 0;
2663                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2664                    ProjectionMask::all(),
2665                    move |b| {
2666                        let array = BooleanArray::from_iter(
2667                            filter
2668                                .iter()
2669                                .skip(filter_offset)
2670                                .take(b.num_rows())
2671                                .map(|x| Some(*x)),
2672                        );
2673                        filter_offset += b.num_rows();
2674                        Ok(array)
2675                    },
2676                ))]);
2677
2678                builder = builder.with_row_filter(filter);
2679                expected_data
2680            }
2681            None => expected_data,
2682        };
2683
2684        if let Some(offset) = opts.offset {
2685            builder = builder.with_offset(offset);
2686            expected_data = expected_data.into_iter().skip(offset).collect();
2687        }
2688
2689        if let Some(limit) = opts.limit {
2690            builder = builder.with_limit(limit);
2691            expected_data = expected_data.into_iter().take(limit).collect();
2692        }
2693
2694        let mut record_reader = builder
2695            .with_batch_size(opts.record_batch_size)
2696            .build()
2697            .unwrap();
2698
2699        let mut total_read = 0;
2700        loop {
2701            let maybe_batch = record_reader.next();
2702            if total_read < expected_data.len() {
2703                let end = min(total_read + opts.record_batch_size, expected_data.len());
2704                let batch = maybe_batch.unwrap().unwrap();
2705                assert_eq!(end - total_read, batch.num_rows());
2706
2707                let a = converter(&expected_data[total_read..end]);
2708                let b = Arc::clone(batch.column(0));
2709
2710                assert_eq!(a.data_type(), b.data_type());
2711                assert_eq!(a.to_data(), b.to_data());
2712                assert_eq!(
2713                    a.as_any().type_id(),
2714                    b.as_any().type_id(),
2715                    "incorrect type ids"
2716                );
2717
2718                total_read = end;
2719            } else {
2720                assert!(maybe_batch.is_none());
2721                break;
2722            }
2723        }
2724    }
2725
2726    fn gen_expected_data<T: DataType>(
2727        def_levels: Option<&Vec<Vec<i16>>>,
2728        values: &[Vec<T::T>],
2729    ) -> Vec<Option<T::T>> {
2730        let data: Vec<Option<T::T>> = match def_levels {
2731            Some(levels) => {
2732                let mut values_iter = values.iter().flatten();
2733                levels
2734                    .iter()
2735                    .flatten()
2736                    .map(|d| match d {
2737                        1 => Some(values_iter.next().cloned().unwrap()),
2738                        0 => None,
2739                        _ => unreachable!(),
2740                    })
2741                    .collect()
2742            }
2743            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2744        };
2745        data
2746    }
2747
2748    fn generate_single_column_file_with_data<T: DataType>(
2749        values: &[Vec<T::T>],
2750        def_levels: Option<&Vec<Vec<i16>>>,
2751        file: File,
2752        schema: TypePtr,
2753        field: Option<Field>,
2754        opts: &TestOptions,
2755    ) -> Result<crate::format::FileMetaData> {
2756        let mut writer_props = opts.writer_props();
2757        if let Some(field) = field {
2758            let arrow_schema = Schema::new(vec![field]);
2759            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2760        }
2761
2762        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2763
2764        for (idx, v) in values.iter().enumerate() {
2765            let def_levels = def_levels.map(|d| d[idx].as_slice());
2766            let mut row_group_writer = writer.next_row_group()?;
2767            {
2768                let mut column_writer = row_group_writer
2769                    .next_column()?
2770                    .expect("Column writer is none!");
2771
2772                column_writer
2773                    .typed::<T>()
2774                    .write_batch(v, def_levels, None)?;
2775
2776                column_writer.close()?;
2777            }
2778            row_group_writer.close()?;
2779        }
2780
2781        writer.close()
2782    }
2783
2784    fn get_test_file(file_name: &str) -> File {
2785        let mut path = PathBuf::new();
2786        path.push(arrow::util::test_util::arrow_test_data());
2787        path.push(file_name);
2788
2789        File::open(path.as_path()).expect("File not found!")
2790    }
2791
2792    #[test]
2793    fn test_read_structs() {
2794        // This particular test file has columns of struct types where there is
2795        // a column that has the same name as one of the struct fields
2796        // (see: ARROW-11452)
2797        let testdata = arrow::util::test_util::parquet_test_data();
2798        let path = format!("{testdata}/nested_structs.rust.parquet");
2799        let file = File::open(&path).unwrap();
2800        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2801
2802        for batch in record_batch_reader {
2803            batch.unwrap();
2804        }
2805
2806        let file = File::open(&path).unwrap();
2807        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2808
2809        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2810        let projected_reader = builder
2811            .with_projection(mask)
2812            .with_batch_size(60)
2813            .build()
2814            .unwrap();
2815
2816        let expected_schema = Schema::new(vec![
2817            Field::new(
2818                "roll_num",
2819                ArrowDataType::Struct(Fields::from(vec![Field::new(
2820                    "count",
2821                    ArrowDataType::UInt64,
2822                    false,
2823                )])),
2824                false,
2825            ),
2826            Field::new(
2827                "PC_CUR",
2828                ArrowDataType::Struct(Fields::from(vec![
2829                    Field::new("mean", ArrowDataType::Int64, false),
2830                    Field::new("sum", ArrowDataType::Int64, false),
2831                ])),
2832                false,
2833            ),
2834        ]);
2835
2836        // Tests for #1652 and #1654
2837        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2838
2839        for batch in projected_reader {
2840            let batch = batch.unwrap();
2841            assert_eq!(batch.schema().as_ref(), &expected_schema);
2842        }
2843    }
2844
2845    #[test]
2846    // same as test_read_structs but constructs projection mask via column names
2847    fn test_read_structs_by_name() {
2848        let testdata = arrow::util::test_util::parquet_test_data();
2849        let path = format!("{testdata}/nested_structs.rust.parquet");
2850        let file = File::open(&path).unwrap();
2851        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2852
2853        for batch in record_batch_reader {
2854            batch.unwrap();
2855        }
2856
2857        let file = File::open(&path).unwrap();
2858        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2859
2860        let mask = ProjectionMask::columns(
2861            builder.parquet_schema(),
2862            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2863        );
2864        let projected_reader = builder
2865            .with_projection(mask)
2866            .with_batch_size(60)
2867            .build()
2868            .unwrap();
2869
2870        let expected_schema = Schema::new(vec![
2871            Field::new(
2872                "roll_num",
2873                ArrowDataType::Struct(Fields::from(vec![Field::new(
2874                    "count",
2875                    ArrowDataType::UInt64,
2876                    false,
2877                )])),
2878                false,
2879            ),
2880            Field::new(
2881                "PC_CUR",
2882                ArrowDataType::Struct(Fields::from(vec![
2883                    Field::new("mean", ArrowDataType::Int64, false),
2884                    Field::new("sum", ArrowDataType::Int64, false),
2885                ])),
2886                false,
2887            ),
2888        ]);
2889
2890        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2891
2892        for batch in projected_reader {
2893            let batch = batch.unwrap();
2894            assert_eq!(batch.schema().as_ref(), &expected_schema);
2895        }
2896    }
2897
2898    #[test]
2899    fn test_read_maps() {
2900        let testdata = arrow::util::test_util::parquet_test_data();
2901        let path = format!("{testdata}/nested_maps.snappy.parquet");
2902        let file = File::open(path).unwrap();
2903        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2904
2905        for batch in record_batch_reader {
2906            batch.unwrap();
2907        }
2908    }
2909
2910    #[test]
2911    fn test_nested_nullability() {
2912        let message_type = "message nested {
2913          OPTIONAL Group group {
2914            REQUIRED INT32 leaf;
2915          }
2916        }";
2917
2918        let file = tempfile::tempfile().unwrap();
2919        let schema = Arc::new(parse_message_type(message_type).unwrap());
2920
2921        {
2922            // Write using low-level parquet API (#1167)
2923            let mut writer =
2924                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2925                    .unwrap();
2926
2927            {
2928                let mut row_group_writer = writer.next_row_group().unwrap();
2929                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2930
2931                column_writer
2932                    .typed::<Int32Type>()
2933                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2934                    .unwrap();
2935
2936                column_writer.close().unwrap();
2937                row_group_writer.close().unwrap();
2938            }
2939
2940            writer.close().unwrap();
2941        }
2942
2943        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2944        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2945
2946        let reader = builder.with_projection(mask).build().unwrap();
2947
2948        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2949            "group",
2950            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2951            true,
2952        )]));
2953
2954        let batch = reader.into_iter().next().unwrap().unwrap();
2955        assert_eq!(batch.schema().as_ref(), &expected_schema);
2956        assert_eq!(batch.num_rows(), 4);
2957        assert_eq!(batch.column(0).null_count(), 2);
2958    }
2959
2960    #[test]
2961    fn test_invalid_utf8() {
2962        // a parquet file with 1 column with invalid utf8
2963        let data = vec![
2964            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2965            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2966            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2967            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2968            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2969            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2970            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2971            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2972            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2973            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2974            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2975            82, 49,
2976        ];
2977
2978        let file = Bytes::from(data);
2979        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2980
2981        let error = record_batch_reader.next().unwrap().unwrap_err();
2982
2983        assert!(
2984            error.to_string().contains("invalid utf-8 sequence"),
2985            "{}",
2986            error
2987        );
2988    }
2989
2990    #[test]
2991    fn test_invalid_utf8_string_array() {
2992        test_invalid_utf8_string_array_inner::<i32>();
2993    }
2994
2995    #[test]
2996    fn test_invalid_utf8_large_string_array() {
2997        test_invalid_utf8_string_array_inner::<i64>();
2998    }
2999
3000    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3001        let cases = [
3002            invalid_utf8_first_char::<O>(),
3003            invalid_utf8_first_char_long_strings::<O>(),
3004            invalid_utf8_later_char::<O>(),
3005            invalid_utf8_later_char_long_strings::<O>(),
3006            invalid_utf8_later_char_really_long_strings::<O>(),
3007            invalid_utf8_later_char_really_long_strings2::<O>(),
3008        ];
3009        for array in &cases {
3010            for encoding in STRING_ENCODINGS {
3011                // data is not valid utf8 we can not construct a correct StringArray
3012                // safely, so purposely create an invalid StringArray
3013                let array = unsafe {
3014                    GenericStringArray::<O>::new_unchecked(
3015                        array.offsets().clone(),
3016                        array.values().clone(),
3017                        array.nulls().cloned(),
3018                    )
3019                };
3020                let data_type = array.data_type().clone();
3021                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3022                let err = read_from_parquet(data).unwrap_err();
3023                let expected_err =
3024                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3025                assert!(
3026                    err.to_string().contains(expected_err),
3027                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3028                );
3029            }
3030        }
3031    }
3032
3033    #[test]
3034    fn test_invalid_utf8_string_view_array() {
3035        let cases = [
3036            invalid_utf8_first_char::<i32>(),
3037            invalid_utf8_first_char_long_strings::<i32>(),
3038            invalid_utf8_later_char::<i32>(),
3039            invalid_utf8_later_char_long_strings::<i32>(),
3040            invalid_utf8_later_char_really_long_strings::<i32>(),
3041            invalid_utf8_later_char_really_long_strings2::<i32>(),
3042        ];
3043
3044        for encoding in STRING_ENCODINGS {
3045            for array in &cases {
3046                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3047                let array = array.as_binary_view();
3048
3049                // data is not valid utf8 we can not construct a correct StringArray
3050                // safely, so purposely create an invalid StringViewArray
3051                let array = unsafe {
3052                    StringViewArray::new_unchecked(
3053                        array.views().clone(),
3054                        array.data_buffers().to_vec(),
3055                        array.nulls().cloned(),
3056                    )
3057                };
3058
3059                let data_type = array.data_type().clone();
3060                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3061                let err = read_from_parquet(data).unwrap_err();
3062                let expected_err =
3063                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3064                assert!(
3065                    err.to_string().contains(expected_err),
3066                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3067                );
3068            }
3069        }
3070    }
3071
3072    /// Encodings suitable for string data
3073    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3074        None,
3075        Some(Encoding::PLAIN),
3076        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3077        Some(Encoding::DELTA_BYTE_ARRAY),
3078    ];
3079
3080    /// Invalid Utf-8 sequence in the first character
3081    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3082    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3083
3084    /// Invalid Utf=8 sequence in NOT the first character
3085    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3086    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3087
3088    /// returns a BinaryArray with invalid UTF8 data in the first character
3089    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3090        let valid: &[u8] = b"   ";
3091        let invalid = INVALID_UTF8_FIRST_CHAR;
3092        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3093    }
3094
3095    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3096    /// string larger than 12 bytes which is handled specially when reading
3097    /// `ByteViewArray`s
3098    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3099        let valid: &[u8] = b"   ";
3100        let mut invalid = vec![];
3101        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3102        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3103        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3104    }
3105
3106    /// returns a BinaryArray with invalid UTF8 data in a character other than
3107    /// the first (this is checked in a special codepath)
3108    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3109        let valid: &[u8] = b"   ";
3110        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3111        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3112    }
3113
3114    /// returns a BinaryArray with invalid UTF8 data in a character other than
3115    /// the first in a string larger than 12 bytes which is handled specially
3116    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3117    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3118        let valid: &[u8] = b"   ";
3119        let mut invalid = vec![];
3120        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3121        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3122        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3123    }
3124
3125    /// returns a BinaryArray with invalid UTF8 data in a character other than
3126    /// the first in a string larger than 128 bytes which is handled specially
3127    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3128    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3129        let valid: &[u8] = b"   ";
3130        let mut invalid = vec![];
3131        for _ in 0..10 {
3132            // each instance is 38 bytes
3133            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3134        }
3135        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3136        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3137    }
3138
3139    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3140    /// invalid UTF8 data in a character other than the first in a string larger
3141    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3142        let valid: &[u8] = b"   ";
3143        let mut valid_long = vec![];
3144        for _ in 0..10 {
3145            // each instance is 38 bytes
3146            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3147        }
3148        let invalid = INVALID_UTF8_LATER_CHAR;
3149        GenericBinaryArray::<O>::from_iter(vec![
3150            None,
3151            Some(valid),
3152            Some(invalid),
3153            None,
3154            Some(&valid_long),
3155            Some(valid),
3156        ])
3157    }
3158
3159    /// writes the array into a single column parquet file with the specified
3160    /// encoding.
3161    ///
3162    /// If no encoding is specified, use default (dictionary) encoding
3163    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3164        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3165        let mut data = vec![];
3166        let schema = batch.schema();
3167        let props = encoding.map(|encoding| {
3168            WriterProperties::builder()
3169                // must disable dictionary encoding to actually use encoding
3170                .set_dictionary_enabled(false)
3171                .set_encoding(encoding)
3172                .build()
3173        });
3174
3175        {
3176            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3177            writer.write(&batch).unwrap();
3178            writer.flush().unwrap();
3179            writer.close().unwrap();
3180        };
3181        data
3182    }
3183
3184    /// read the parquet file into a record batch
3185    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3186        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3187            .unwrap()
3188            .build()
3189            .unwrap();
3190
3191        reader.collect()
3192    }
3193
3194    #[test]
3195    fn test_dictionary_preservation() {
3196        let fields = vec![Arc::new(
3197            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3198                .with_repetition(Repetition::OPTIONAL)
3199                .with_converted_type(ConvertedType::UTF8)
3200                .build()
3201                .unwrap(),
3202        )];
3203
3204        let schema = Arc::new(
3205            Type::group_type_builder("test_schema")
3206                .with_fields(fields)
3207                .build()
3208                .unwrap(),
3209        );
3210
3211        let dict_type = ArrowDataType::Dictionary(
3212            Box::new(ArrowDataType::Int32),
3213            Box::new(ArrowDataType::Utf8),
3214        );
3215
3216        let arrow_field = Field::new("leaf", dict_type, true);
3217
3218        let mut file = tempfile::tempfile().unwrap();
3219
3220        let values = vec![
3221            vec![
3222                ByteArray::from("hello"),
3223                ByteArray::from("a"),
3224                ByteArray::from("b"),
3225                ByteArray::from("d"),
3226            ],
3227            vec![
3228                ByteArray::from("c"),
3229                ByteArray::from("a"),
3230                ByteArray::from("b"),
3231            ],
3232        ];
3233
3234        let def_levels = vec![
3235            vec![1, 0, 0, 1, 0, 0, 1, 1],
3236            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3237        ];
3238
3239        let opts = TestOptions {
3240            encoding: Encoding::RLE_DICTIONARY,
3241            ..Default::default()
3242        };
3243
3244        generate_single_column_file_with_data::<ByteArrayType>(
3245            &values,
3246            Some(&def_levels),
3247            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3248            schema,
3249            Some(arrow_field),
3250            &opts,
3251        )
3252        .unwrap();
3253
3254        file.rewind().unwrap();
3255
3256        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3257
3258        let batches = record_reader
3259            .collect::<Result<Vec<RecordBatch>, _>>()
3260            .unwrap();
3261
3262        assert_eq!(batches.len(), 6);
3263        assert!(batches.iter().all(|x| x.num_columns() == 1));
3264
3265        let row_counts = batches
3266            .iter()
3267            .map(|x| (x.num_rows(), x.column(0).null_count()))
3268            .collect::<Vec<_>>();
3269
3270        assert_eq!(
3271            row_counts,
3272            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3273        );
3274
3275        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3276
3277        // First and second batch in same row group -> same dictionary
3278        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3279        // Third batch spans row group -> computed dictionary
3280        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3281        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3282        // Fourth, fifth and sixth from same row group -> same dictionary
3283        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3284        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3285    }
3286
3287    #[test]
3288    fn test_read_null_list() {
3289        let testdata = arrow::util::test_util::parquet_test_data();
3290        let path = format!("{testdata}/null_list.parquet");
3291        let file = File::open(path).unwrap();
3292        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3293
3294        let batch = record_batch_reader.next().unwrap().unwrap();
3295        assert_eq!(batch.num_rows(), 1);
3296        assert_eq!(batch.num_columns(), 1);
3297        assert_eq!(batch.column(0).len(), 1);
3298
3299        let list = batch
3300            .column(0)
3301            .as_any()
3302            .downcast_ref::<ListArray>()
3303            .unwrap();
3304        assert_eq!(list.len(), 1);
3305        assert!(list.is_valid(0));
3306
3307        let val = list.value(0);
3308        assert_eq!(val.len(), 0);
3309    }
3310
3311    #[test]
3312    fn test_null_schema_inference() {
3313        let testdata = arrow::util::test_util::parquet_test_data();
3314        let path = format!("{testdata}/null_list.parquet");
3315        let file = File::open(path).unwrap();
3316
3317        let arrow_field = Field::new(
3318            "emptylist",
3319            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3320            true,
3321        );
3322
3323        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3324        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3325        let schema = builder.schema();
3326        assert_eq!(schema.fields().len(), 1);
3327        assert_eq!(schema.field(0), &arrow_field);
3328    }
3329
3330    #[test]
3331    fn test_skip_metadata() {
3332        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3333        let field = Field::new("col", col.data_type().clone(), true);
3334
3335        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3336
3337        let metadata = [("key".to_string(), "value".to_string())]
3338            .into_iter()
3339            .collect();
3340
3341        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3342
3343        assert_ne!(schema_with_metadata, schema_without_metadata);
3344
3345        let batch =
3346            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3347
3348        let file = |version: WriterVersion| {
3349            let props = WriterProperties::builder()
3350                .set_writer_version(version)
3351                .build();
3352
3353            let file = tempfile().unwrap();
3354            let mut writer =
3355                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3356                    .unwrap();
3357            writer.write(&batch).unwrap();
3358            writer.close().unwrap();
3359            file
3360        };
3361
3362        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3363
3364        let v1_reader = file(WriterVersion::PARQUET_1_0);
3365        let v2_reader = file(WriterVersion::PARQUET_2_0);
3366
3367        let arrow_reader =
3368            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3369        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3370
3371        let reader =
3372            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3373                .unwrap()
3374                .build()
3375                .unwrap();
3376        assert_eq!(reader.schema(), schema_without_metadata);
3377
3378        let arrow_reader =
3379            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3380        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3381
3382        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3383            .unwrap()
3384            .build()
3385            .unwrap();
3386        assert_eq!(reader.schema(), schema_without_metadata);
3387    }
3388
3389    fn write_parquet_from_iter<I, F>(value: I) -> File
3390    where
3391        I: IntoIterator<Item = (F, ArrayRef)>,
3392        F: AsRef<str>,
3393    {
3394        let batch = RecordBatch::try_from_iter(value).unwrap();
3395        let file = tempfile().unwrap();
3396        let mut writer =
3397            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3398        writer.write(&batch).unwrap();
3399        writer.close().unwrap();
3400        file
3401    }
3402
3403    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3404    where
3405        I: IntoIterator<Item = (F, ArrayRef)>,
3406        F: AsRef<str>,
3407    {
3408        let file = write_parquet_from_iter(value);
3409        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3410        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3411            file.try_clone().unwrap(),
3412            options_with_schema,
3413        );
3414        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3415    }
3416
3417    #[test]
3418    fn test_schema_too_few_columns() {
3419        run_schema_test_with_error(
3420            vec![
3421                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3422                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3423            ],
3424            Arc::new(Schema::new(vec![Field::new(
3425                "int64",
3426                ArrowDataType::Int64,
3427                false,
3428            )])),
3429            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3430        );
3431    }
3432
3433    #[test]
3434    fn test_schema_too_many_columns() {
3435        run_schema_test_with_error(
3436            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3437            Arc::new(Schema::new(vec![
3438                Field::new("int64", ArrowDataType::Int64, false),
3439                Field::new("int32", ArrowDataType::Int32, false),
3440            ])),
3441            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3442        );
3443    }
3444
3445    #[test]
3446    fn test_schema_mismatched_column_names() {
3447        run_schema_test_with_error(
3448            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3449            Arc::new(Schema::new(vec![Field::new(
3450                "other",
3451                ArrowDataType::Int64,
3452                false,
3453            )])),
3454            "Arrow: incompatible arrow schema, expected field named int64 got other",
3455        );
3456    }
3457
3458    #[test]
3459    fn test_schema_incompatible_columns() {
3460        run_schema_test_with_error(
3461            vec![
3462                (
3463                    "col1_invalid",
3464                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3465                ),
3466                (
3467                    "col2_valid",
3468                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3469                ),
3470                (
3471                    "col3_invalid",
3472                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3473                ),
3474            ],
3475            Arc::new(Schema::new(vec![
3476                Field::new("col1_invalid", ArrowDataType::Int32, false),
3477                Field::new("col2_valid", ArrowDataType::Int32, false),
3478                Field::new("col3_invalid", ArrowDataType::Int32, false),
3479            ])),
3480            "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3481        );
3482    }
3483
3484    #[test]
3485    fn test_one_incompatible_nested_column() {
3486        let nested_fields = Fields::from(vec![
3487            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3488            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3489        ]);
3490        let nested = StructArray::try_new(
3491            nested_fields,
3492            vec![
3493                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3494                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3495            ],
3496            None,
3497        )
3498        .expect("struct array");
3499        let supplied_nested_fields = Fields::from(vec![
3500            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3501            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3502        ]);
3503        run_schema_test_with_error(
3504            vec![
3505                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3506                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3507                ("nested", Arc::new(nested) as ArrayRef),
3508            ],
3509            Arc::new(Schema::new(vec![
3510                Field::new("col1", ArrowDataType::Int64, false),
3511                Field::new("col2", ArrowDataType::Int32, false),
3512                Field::new(
3513                    "nested",
3514                    ArrowDataType::Struct(supplied_nested_fields),
3515                    false,
3516                ),
3517            ])),
3518            "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3519        );
3520    }
3521
3522    #[test]
3523    fn test_read_binary_as_utf8() {
3524        let file = write_parquet_from_iter(vec![
3525            (
3526                "binary_to_utf8",
3527                Arc::new(BinaryArray::from(vec![
3528                    b"one".as_ref(),
3529                    b"two".as_ref(),
3530                    b"three".as_ref(),
3531                ])) as ArrayRef,
3532            ),
3533            (
3534                "large_binary_to_large_utf8",
3535                Arc::new(LargeBinaryArray::from(vec![
3536                    b"one".as_ref(),
3537                    b"two".as_ref(),
3538                    b"three".as_ref(),
3539                ])) as ArrayRef,
3540            ),
3541            (
3542                "binary_view_to_utf8_view",
3543                Arc::new(BinaryViewArray::from(vec![
3544                    b"one".as_ref(),
3545                    b"two".as_ref(),
3546                    b"three".as_ref(),
3547                ])) as ArrayRef,
3548            ),
3549        ]);
3550        let supplied_fields = Fields::from(vec![
3551            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3552            Field::new(
3553                "large_binary_to_large_utf8",
3554                ArrowDataType::LargeUtf8,
3555                false,
3556            ),
3557            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3558        ]);
3559
3560        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3561        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3562            file.try_clone().unwrap(),
3563            options,
3564        )
3565        .expect("reader builder with schema")
3566        .build()
3567        .expect("reader with schema");
3568
3569        let batch = arrow_reader.next().unwrap().unwrap();
3570        assert_eq!(batch.num_columns(), 3);
3571        assert_eq!(batch.num_rows(), 3);
3572        assert_eq!(
3573            batch
3574                .column(0)
3575                .as_string::<i32>()
3576                .iter()
3577                .collect::<Vec<_>>(),
3578            vec![Some("one"), Some("two"), Some("three")]
3579        );
3580
3581        assert_eq!(
3582            batch
3583                .column(1)
3584                .as_string::<i64>()
3585                .iter()
3586                .collect::<Vec<_>>(),
3587            vec![Some("one"), Some("two"), Some("three")]
3588        );
3589
3590        assert_eq!(
3591            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3592            vec![Some("one"), Some("two"), Some("three")]
3593        );
3594    }
3595
3596    #[test]
3597    #[should_panic(expected = "Invalid UTF8 sequence at")]
3598    fn test_read_non_utf8_binary_as_utf8() {
3599        let file = write_parquet_from_iter(vec![(
3600            "non_utf8_binary",
3601            Arc::new(BinaryArray::from(vec![
3602                b"\xDE\x00\xFF".as_ref(),
3603                b"\xDE\x01\xAA".as_ref(),
3604                b"\xDE\x02\xFF".as_ref(),
3605            ])) as ArrayRef,
3606        )]);
3607        let supplied_fields = Fields::from(vec![Field::new(
3608            "non_utf8_binary",
3609            ArrowDataType::Utf8,
3610            false,
3611        )]);
3612
3613        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3614        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3615            file.try_clone().unwrap(),
3616            options,
3617        )
3618        .expect("reader builder with schema")
3619        .build()
3620        .expect("reader with schema");
3621        arrow_reader.next().unwrap().unwrap_err();
3622    }
3623
3624    #[test]
3625    fn test_with_schema() {
3626        let nested_fields = Fields::from(vec![
3627            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3628            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3629        ]);
3630
3631        let nested_arrays: Vec<ArrayRef> = vec![
3632            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3633            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3634        ];
3635
3636        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3637
3638        let file = write_parquet_from_iter(vec![
3639            (
3640                "int32_to_ts_second",
3641                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3642            ),
3643            (
3644                "date32_to_date64",
3645                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3646            ),
3647            ("nested", Arc::new(nested) as ArrayRef),
3648        ]);
3649
3650        let supplied_nested_fields = Fields::from(vec![
3651            Field::new(
3652                "utf8_to_dict",
3653                ArrowDataType::Dictionary(
3654                    Box::new(ArrowDataType::Int32),
3655                    Box::new(ArrowDataType::Utf8),
3656                ),
3657                false,
3658            ),
3659            Field::new(
3660                "int64_to_ts_nano",
3661                ArrowDataType::Timestamp(
3662                    arrow::datatypes::TimeUnit::Nanosecond,
3663                    Some("+10:00".into()),
3664                ),
3665                false,
3666            ),
3667        ]);
3668
3669        let supplied_schema = Arc::new(Schema::new(vec![
3670            Field::new(
3671                "int32_to_ts_second",
3672                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3673                false,
3674            ),
3675            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3676            Field::new(
3677                "nested",
3678                ArrowDataType::Struct(supplied_nested_fields),
3679                false,
3680            ),
3681        ]));
3682
3683        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3684        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3685            file.try_clone().unwrap(),
3686            options,
3687        )
3688        .expect("reader builder with schema")
3689        .build()
3690        .expect("reader with schema");
3691
3692        assert_eq!(arrow_reader.schema(), supplied_schema);
3693        let batch = arrow_reader.next().unwrap().unwrap();
3694        assert_eq!(batch.num_columns(), 3);
3695        assert_eq!(batch.num_rows(), 4);
3696        assert_eq!(
3697            batch
3698                .column(0)
3699                .as_any()
3700                .downcast_ref::<TimestampSecondArray>()
3701                .expect("downcast to timestamp second")
3702                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3703                .map(|v| v.to_string())
3704                .expect("value as datetime"),
3705            "1970-01-01 01:00:00 +01:00"
3706        );
3707        assert_eq!(
3708            batch
3709                .column(1)
3710                .as_any()
3711                .downcast_ref::<Date64Array>()
3712                .expect("downcast to date64")
3713                .value_as_date(0)
3714                .map(|v| v.to_string())
3715                .expect("value as date"),
3716            "1970-01-01"
3717        );
3718
3719        let nested = batch
3720            .column(2)
3721            .as_any()
3722            .downcast_ref::<StructArray>()
3723            .expect("downcast to struct");
3724
3725        let nested_dict = nested
3726            .column(0)
3727            .as_any()
3728            .downcast_ref::<Int32DictionaryArray>()
3729            .expect("downcast to dictionary");
3730
3731        assert_eq!(
3732            nested_dict
3733                .values()
3734                .as_any()
3735                .downcast_ref::<StringArray>()
3736                .expect("downcast to string")
3737                .iter()
3738                .collect::<Vec<_>>(),
3739            vec![Some("a"), Some("b")]
3740        );
3741
3742        assert_eq!(
3743            nested_dict.keys().iter().collect::<Vec<_>>(),
3744            vec![Some(0), Some(0), Some(0), Some(1)]
3745        );
3746
3747        assert_eq!(
3748            nested
3749                .column(1)
3750                .as_any()
3751                .downcast_ref::<TimestampNanosecondArray>()
3752                .expect("downcast to timestamp nanosecond")
3753                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3754                .map(|v| v.to_string())
3755                .expect("value as datetime"),
3756            "1970-01-01 10:00:00.000000001 +10:00"
3757        );
3758    }
3759
3760    #[test]
3761    fn test_empty_projection() {
3762        let testdata = arrow::util::test_util::parquet_test_data();
3763        let path = format!("{testdata}/alltypes_plain.parquet");
3764        let file = File::open(path).unwrap();
3765
3766        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3767        let file_metadata = builder.metadata().file_metadata();
3768        let expected_rows = file_metadata.num_rows() as usize;
3769
3770        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3771        let batch_reader = builder
3772            .with_projection(mask)
3773            .with_batch_size(2)
3774            .build()
3775            .unwrap();
3776
3777        let mut total_rows = 0;
3778        for maybe_batch in batch_reader {
3779            let batch = maybe_batch.unwrap();
3780            total_rows += batch.num_rows();
3781            assert_eq!(batch.num_columns(), 0);
3782            assert!(batch.num_rows() <= 2);
3783        }
3784
3785        assert_eq!(total_rows, expected_rows);
3786    }
3787
3788    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3789        let schema = Arc::new(Schema::new(vec![Field::new(
3790            "list",
3791            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3792            true,
3793        )]));
3794
3795        let mut buf = Vec::with_capacity(1024);
3796
3797        let mut writer = ArrowWriter::try_new(
3798            &mut buf,
3799            schema.clone(),
3800            Some(
3801                WriterProperties::builder()
3802                    .set_max_row_group_size(row_group_size)
3803                    .build(),
3804            ),
3805        )
3806        .unwrap();
3807        for _ in 0..2 {
3808            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3809            for _ in 0..(batch_size) {
3810                list_builder.append(true);
3811            }
3812            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3813                .unwrap();
3814            writer.write(&batch).unwrap();
3815        }
3816        writer.close().unwrap();
3817
3818        let mut record_reader =
3819            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3820        assert_eq!(
3821            batch_size,
3822            record_reader.next().unwrap().unwrap().num_rows()
3823        );
3824        assert_eq!(
3825            batch_size,
3826            record_reader.next().unwrap().unwrap().num_rows()
3827        );
3828    }
3829
3830    #[test]
3831    fn test_row_group_exact_multiple() {
3832        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3833        test_row_group_batch(8, 8);
3834        test_row_group_batch(10, 8);
3835        test_row_group_batch(8, 10);
3836        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3837        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3838        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3839        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3840        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3841    }
3842
3843    /// Given a RecordBatch containing all the column data, return the expected batches given
3844    /// a `batch_size` and `selection`
3845    fn get_expected_batches(
3846        column: &RecordBatch,
3847        selection: &RowSelection,
3848        batch_size: usize,
3849    ) -> Vec<RecordBatch> {
3850        let mut expected_batches = vec![];
3851
3852        let mut selection: VecDeque<_> = selection.clone().into();
3853        let mut row_offset = 0;
3854        let mut last_start = None;
3855        while row_offset < column.num_rows() && !selection.is_empty() {
3856            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3857            while batch_remaining > 0 && !selection.is_empty() {
3858                let (to_read, skip) = match selection.front_mut() {
3859                    Some(selection) if selection.row_count > batch_remaining => {
3860                        selection.row_count -= batch_remaining;
3861                        (batch_remaining, selection.skip)
3862                    }
3863                    Some(_) => {
3864                        let select = selection.pop_front().unwrap();
3865                        (select.row_count, select.skip)
3866                    }
3867                    None => break,
3868                };
3869
3870                batch_remaining -= to_read;
3871
3872                match skip {
3873                    true => {
3874                        if let Some(last_start) = last_start.take() {
3875                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3876                        }
3877                        row_offset += to_read
3878                    }
3879                    false => {
3880                        last_start.get_or_insert(row_offset);
3881                        row_offset += to_read
3882                    }
3883                }
3884            }
3885        }
3886
3887        if let Some(last_start) = last_start.take() {
3888            expected_batches.push(column.slice(last_start, row_offset - last_start))
3889        }
3890
3891        // Sanity check, all batches except the final should be the batch size
3892        for batch in &expected_batches[..expected_batches.len() - 1] {
3893            assert_eq!(batch.num_rows(), batch_size);
3894        }
3895
3896        expected_batches
3897    }
3898
3899    fn create_test_selection(
3900        step_len: usize,
3901        total_len: usize,
3902        skip_first: bool,
3903    ) -> (RowSelection, usize) {
3904        let mut remaining = total_len;
3905        let mut skip = skip_first;
3906        let mut vec = vec![];
3907        let mut selected_count = 0;
3908        while remaining != 0 {
3909            let step = if remaining > step_len {
3910                step_len
3911            } else {
3912                remaining
3913            };
3914            vec.push(RowSelector {
3915                row_count: step,
3916                skip,
3917            });
3918            remaining -= step;
3919            if !skip {
3920                selected_count += step;
3921            }
3922            skip = !skip;
3923        }
3924        (vec.into(), selected_count)
3925    }
3926
3927    #[test]
3928    fn test_scan_row_with_selection() {
3929        let testdata = arrow::util::test_util::parquet_test_data();
3930        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3931        let test_file = File::open(&path).unwrap();
3932
3933        let mut serial_reader =
3934            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3935        let data = serial_reader.next().unwrap().unwrap();
3936
3937        let do_test = |batch_size: usize, selection_len: usize| {
3938            for skip_first in [false, true] {
3939                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3940
3941                let expected = get_expected_batches(&data, &selections, batch_size);
3942                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3943                assert_eq!(
3944                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3945                    expected,
3946                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3947                );
3948            }
3949        };
3950
3951        // total row count 7300
3952        // 1. test selection len more than one page row count
3953        do_test(1000, 1000);
3954
3955        // 2. test selection len less than one page row count
3956        do_test(20, 20);
3957
3958        // 3. test selection_len less than batch_size
3959        do_test(20, 5);
3960
3961        // 4. test selection_len more than batch_size
3962        // If batch_size < selection_len
3963        do_test(20, 5);
3964
3965        fn create_skip_reader(
3966            test_file: &File,
3967            batch_size: usize,
3968            selections: RowSelection,
3969        ) -> ParquetRecordBatchReader {
3970            let options = ArrowReaderOptions::new().with_page_index(true);
3971            let file = test_file.try_clone().unwrap();
3972            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3973                .unwrap()
3974                .with_batch_size(batch_size)
3975                .with_row_selection(selections)
3976                .build()
3977                .unwrap()
3978        }
3979    }
3980
3981    #[test]
3982    fn test_batch_size_overallocate() {
3983        let testdata = arrow::util::test_util::parquet_test_data();
3984        // `alltypes_plain.parquet` only have 8 rows
3985        let path = format!("{testdata}/alltypes_plain.parquet");
3986        let test_file = File::open(path).unwrap();
3987
3988        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3989        let num_rows = builder.metadata.file_metadata().num_rows();
3990        let reader = builder
3991            .with_batch_size(1024)
3992            .with_projection(ProjectionMask::all())
3993            .build()
3994            .unwrap();
3995        assert_ne!(1024, num_rows);
3996        assert_eq!(reader.batch_size, num_rows as usize);
3997    }
3998
3999    #[test]
4000    fn test_read_with_page_index_enabled() {
4001        let testdata = arrow::util::test_util::parquet_test_data();
4002
4003        {
4004            // `alltypes_tiny_pages.parquet` has page index
4005            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4006            let test_file = File::open(path).unwrap();
4007            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4008                test_file,
4009                ArrowReaderOptions::new().with_page_index(true),
4010            )
4011            .unwrap();
4012            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4013            let reader = builder.build().unwrap();
4014            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4015            assert_eq!(batches.len(), 8);
4016        }
4017
4018        {
4019            // `alltypes_plain.parquet` doesn't have page index
4020            let path = format!("{testdata}/alltypes_plain.parquet");
4021            let test_file = File::open(path).unwrap();
4022            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4023                test_file,
4024                ArrowReaderOptions::new().with_page_index(true),
4025            )
4026            .unwrap();
4027            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4028            // we should read the file successfully.
4029            assert!(builder.metadata().offset_index().is_none());
4030            let reader = builder.build().unwrap();
4031            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4032            assert_eq!(batches.len(), 1);
4033        }
4034    }
4035
4036    #[test]
4037    fn test_raw_repetition() {
4038        const MESSAGE_TYPE: &str = "
4039            message Log {
4040              OPTIONAL INT32 eventType;
4041              REPEATED INT32 category;
4042              REPEATED group filter {
4043                OPTIONAL INT32 error;
4044              }
4045            }
4046        ";
4047        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4048        let props = Default::default();
4049
4050        let mut buf = Vec::with_capacity(1024);
4051        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4052        let mut row_group_writer = writer.next_row_group().unwrap();
4053
4054        // column 0
4055        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4056        col_writer
4057            .typed::<Int32Type>()
4058            .write_batch(&[1], Some(&[1]), None)
4059            .unwrap();
4060        col_writer.close().unwrap();
4061        // column 1
4062        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4063        col_writer
4064            .typed::<Int32Type>()
4065            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4066            .unwrap();
4067        col_writer.close().unwrap();
4068        // column 2
4069        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4070        col_writer
4071            .typed::<Int32Type>()
4072            .write_batch(&[1], Some(&[1]), Some(&[0]))
4073            .unwrap();
4074        col_writer.close().unwrap();
4075
4076        let rg_md = row_group_writer.close().unwrap();
4077        assert_eq!(rg_md.num_rows(), 1);
4078        writer.close().unwrap();
4079
4080        let bytes = Bytes::from(buf);
4081
4082        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4083        let full = no_mask.next().unwrap().unwrap();
4084
4085        assert_eq!(full.num_columns(), 3);
4086
4087        for idx in 0..3 {
4088            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4089            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4090            let mut reader = b.with_projection(mask).build().unwrap();
4091            let projected = reader.next().unwrap().unwrap();
4092
4093            assert_eq!(projected.num_columns(), 1);
4094            assert_eq!(full.column(idx), projected.column(0));
4095        }
4096    }
4097
4098    #[test]
4099    fn test_read_lz4_raw() {
4100        let testdata = arrow::util::test_util::parquet_test_data();
4101        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4102        let file = File::open(path).unwrap();
4103
4104        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4105            .unwrap()
4106            .collect::<Result<Vec<_>, _>>()
4107            .unwrap();
4108        assert_eq!(batches.len(), 1);
4109        let batch = &batches[0];
4110
4111        assert_eq!(batch.num_columns(), 3);
4112        assert_eq!(batch.num_rows(), 4);
4113
4114        // https://github.com/apache/parquet-testing/pull/18
4115        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4116        assert_eq!(
4117            a.values(),
4118            &[1593604800, 1593604800, 1593604801, 1593604801]
4119        );
4120
4121        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4122        let a: Vec<_> = a.iter().flatten().collect();
4123        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4124
4125        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4126        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4127    }
4128
4129    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4130    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4131    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4132    //    LZ4_HADOOP algorithm for compression.
4133    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4134    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4135    //    older parquet-cpp versions.
4136    //
4137    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4138    #[test]
4139    fn test_read_lz4_hadoop_fallback() {
4140        for file in [
4141            "hadoop_lz4_compressed.parquet",
4142            "non_hadoop_lz4_compressed.parquet",
4143        ] {
4144            let testdata = arrow::util::test_util::parquet_test_data();
4145            let path = format!("{testdata}/{file}");
4146            let file = File::open(path).unwrap();
4147            let expected_rows = 4;
4148
4149            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4150                .unwrap()
4151                .collect::<Result<Vec<_>, _>>()
4152                .unwrap();
4153            assert_eq!(batches.len(), 1);
4154            let batch = &batches[0];
4155
4156            assert_eq!(batch.num_columns(), 3);
4157            assert_eq!(batch.num_rows(), expected_rows);
4158
4159            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4160            assert_eq!(
4161                a.values(),
4162                &[1593604800, 1593604800, 1593604801, 1593604801]
4163            );
4164
4165            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4166            let b: Vec<_> = b.iter().flatten().collect();
4167            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4168
4169            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4170            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4171        }
4172    }
4173
4174    #[test]
4175    fn test_read_lz4_hadoop_large() {
4176        let testdata = arrow::util::test_util::parquet_test_data();
4177        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4178        let file = File::open(path).unwrap();
4179        let expected_rows = 10000;
4180
4181        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4182            .unwrap()
4183            .collect::<Result<Vec<_>, _>>()
4184            .unwrap();
4185        assert_eq!(batches.len(), 1);
4186        let batch = &batches[0];
4187
4188        assert_eq!(batch.num_columns(), 1);
4189        assert_eq!(batch.num_rows(), expected_rows);
4190
4191        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4192        let a: Vec<_> = a.iter().flatten().collect();
4193        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4194        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4195        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4196        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4197    }
4198
4199    #[test]
4200    #[cfg(feature = "snap")]
4201    fn test_read_nested_lists() {
4202        let testdata = arrow::util::test_util::parquet_test_data();
4203        let path = format!("{testdata}/nested_lists.snappy.parquet");
4204        let file = File::open(path).unwrap();
4205
4206        let f = file.try_clone().unwrap();
4207        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4208        let expected = reader.next().unwrap().unwrap();
4209        assert_eq!(expected.num_rows(), 3);
4210
4211        let selection = RowSelection::from(vec![
4212            RowSelector::skip(1),
4213            RowSelector::select(1),
4214            RowSelector::skip(1),
4215        ]);
4216        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4217            .unwrap()
4218            .with_row_selection(selection)
4219            .build()
4220            .unwrap();
4221
4222        let actual = reader.next().unwrap().unwrap();
4223        assert_eq!(actual.num_rows(), 1);
4224        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4225    }
4226
4227    #[test]
4228    fn test_arbitrary_decimal() {
4229        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4230        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4231            .with_precision_and_scale(19, 0)
4232            .unwrap();
4233        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4234            .with_precision_and_scale(12, 0)
4235            .unwrap();
4236        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4237            .with_precision_and_scale(17, 10)
4238            .unwrap();
4239
4240        let written = RecordBatch::try_from_iter([
4241            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4242            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4243            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4244        ])
4245        .unwrap();
4246
4247        let mut buffer = Vec::with_capacity(1024);
4248        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4249        writer.write(&written).unwrap();
4250        writer.close().unwrap();
4251
4252        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4253            .unwrap()
4254            .collect::<Result<Vec<_>, _>>()
4255            .unwrap();
4256
4257        assert_eq!(&written.slice(0, 8), &read[0]);
4258    }
4259
4260    #[test]
4261    fn test_list_skip() {
4262        let mut list = ListBuilder::new(Int32Builder::new());
4263        list.append_value([Some(1), Some(2)]);
4264        list.append_value([Some(3)]);
4265        list.append_value([Some(4)]);
4266        let list = list.finish();
4267        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4268
4269        // First page contains 2 values but only 1 row
4270        let props = WriterProperties::builder()
4271            .set_data_page_row_count_limit(1)
4272            .set_write_batch_size(2)
4273            .build();
4274
4275        let mut buffer = Vec::with_capacity(1024);
4276        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4277        writer.write(&batch).unwrap();
4278        writer.close().unwrap();
4279
4280        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4281        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4282            .unwrap()
4283            .with_row_selection(selection.into())
4284            .build()
4285            .unwrap();
4286        let out = reader.next().unwrap().unwrap();
4287        assert_eq!(out.num_rows(), 1);
4288        assert_eq!(out, batch.slice(2, 1));
4289    }
4290
4291    fn test_decimal_roundtrip<T: DecimalType>() {
4292        // Precision <= 9 -> INT32
4293        // Precision <= 18 -> INT64
4294        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4295
4296        let d = |values: Vec<usize>, p: u8| {
4297            let iter = values.into_iter().map(T::Native::usize_as);
4298            PrimitiveArray::<T>::from_iter_values(iter)
4299                .with_precision_and_scale(p, 2)
4300                .unwrap()
4301        };
4302
4303        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4304        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4305        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4306        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4307
4308        let batch = RecordBatch::try_from_iter([
4309            ("d1", Arc::new(d1) as ArrayRef),
4310            ("d2", Arc::new(d2) as ArrayRef),
4311            ("d3", Arc::new(d3) as ArrayRef),
4312            ("d4", Arc::new(d4) as ArrayRef),
4313        ])
4314        .unwrap();
4315
4316        let mut buffer = Vec::with_capacity(1024);
4317        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4318        writer.write(&batch).unwrap();
4319        writer.close().unwrap();
4320
4321        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4322        let t1 = builder.parquet_schema().columns()[0].physical_type();
4323        assert_eq!(t1, PhysicalType::INT32);
4324        let t2 = builder.parquet_schema().columns()[1].physical_type();
4325        assert_eq!(t2, PhysicalType::INT64);
4326        let t3 = builder.parquet_schema().columns()[2].physical_type();
4327        assert_eq!(t3, PhysicalType::INT64);
4328        let t4 = builder.parquet_schema().columns()[3].physical_type();
4329        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4330
4331        let mut reader = builder.build().unwrap();
4332        assert_eq!(batch.schema(), reader.schema());
4333
4334        let out = reader.next().unwrap().unwrap();
4335        assert_eq!(batch, out);
4336    }
4337
4338    #[test]
4339    fn test_decimal() {
4340        test_decimal_roundtrip::<Decimal128Type>();
4341        test_decimal_roundtrip::<Decimal256Type>();
4342    }
4343
4344    #[test]
4345    fn test_list_selection() {
4346        let schema = Arc::new(Schema::new(vec![Field::new_list(
4347            "list",
4348            Field::new_list_field(ArrowDataType::Utf8, true),
4349            false,
4350        )]));
4351        let mut buf = Vec::with_capacity(1024);
4352
4353        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4354
4355        for i in 0..2 {
4356            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4357            for j in 0..1024 {
4358                list_a_builder.values().append_value(format!("{i} {j}"));
4359                list_a_builder.append(true);
4360            }
4361            let batch =
4362                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4363                    .unwrap();
4364            writer.write(&batch).unwrap();
4365        }
4366        let _metadata = writer.close().unwrap();
4367
4368        let buf = Bytes::from(buf);
4369        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4370            .unwrap()
4371            .with_row_selection(RowSelection::from(vec![
4372                RowSelector::skip(100),
4373                RowSelector::select(924),
4374                RowSelector::skip(100),
4375                RowSelector::select(924),
4376            ]))
4377            .build()
4378            .unwrap();
4379
4380        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4381        let batch = concat_batches(&schema, &batches).unwrap();
4382
4383        assert_eq!(batch.num_rows(), 924 * 2);
4384        let list = batch.column(0).as_list::<i32>();
4385
4386        for w in list.value_offsets().windows(2) {
4387            assert_eq!(w[0] + 1, w[1])
4388        }
4389        let mut values = list.values().as_string::<i32>().iter();
4390
4391        for i in 0..2 {
4392            for j in 100..1024 {
4393                let expected = format!("{i} {j}");
4394                assert_eq!(values.next().unwrap().unwrap(), &expected);
4395            }
4396        }
4397    }
4398
4399    #[test]
4400    fn test_list_selection_fuzz() {
4401        let mut rng = rng();
4402        let schema = Arc::new(Schema::new(vec![Field::new_list(
4403            "list",
4404            Field::new_list(
4405                Field::LIST_FIELD_DEFAULT_NAME,
4406                Field::new_list_field(ArrowDataType::Int32, true),
4407                true,
4408            ),
4409            true,
4410        )]));
4411        let mut buf = Vec::with_capacity(1024);
4412        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4413
4414        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4415
4416        for _ in 0..2048 {
4417            if rng.random_bool(0.2) {
4418                list_a_builder.append(false);
4419                continue;
4420            }
4421
4422            let list_a_len = rng.random_range(0..10);
4423            let list_b_builder = list_a_builder.values();
4424
4425            for _ in 0..list_a_len {
4426                if rng.random_bool(0.2) {
4427                    list_b_builder.append(false);
4428                    continue;
4429                }
4430
4431                let list_b_len = rng.random_range(0..10);
4432                let int_builder = list_b_builder.values();
4433                for _ in 0..list_b_len {
4434                    match rng.random_bool(0.2) {
4435                        true => int_builder.append_null(),
4436                        false => int_builder.append_value(rng.random()),
4437                    }
4438                }
4439                list_b_builder.append(true)
4440            }
4441            list_a_builder.append(true);
4442        }
4443
4444        let array = Arc::new(list_a_builder.finish());
4445        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4446
4447        writer.write(&batch).unwrap();
4448        let _metadata = writer.close().unwrap();
4449
4450        let buf = Bytes::from(buf);
4451
4452        let cases = [
4453            vec![
4454                RowSelector::skip(100),
4455                RowSelector::select(924),
4456                RowSelector::skip(100),
4457                RowSelector::select(924),
4458            ],
4459            vec![
4460                RowSelector::select(924),
4461                RowSelector::skip(100),
4462                RowSelector::select(924),
4463                RowSelector::skip(100),
4464            ],
4465            vec![
4466                RowSelector::skip(1023),
4467                RowSelector::select(1),
4468                RowSelector::skip(1023),
4469                RowSelector::select(1),
4470            ],
4471            vec![
4472                RowSelector::select(1),
4473                RowSelector::skip(1023),
4474                RowSelector::select(1),
4475                RowSelector::skip(1023),
4476            ],
4477        ];
4478
4479        for batch_size in [100, 1024, 2048] {
4480            for selection in &cases {
4481                let selection = RowSelection::from(selection.clone());
4482                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4483                    .unwrap()
4484                    .with_row_selection(selection.clone())
4485                    .with_batch_size(batch_size)
4486                    .build()
4487                    .unwrap();
4488
4489                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4490                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4491                assert_eq!(actual.num_rows(), selection.row_count());
4492
4493                let mut batch_offset = 0;
4494                let mut actual_offset = 0;
4495                for selector in selection.iter() {
4496                    if selector.skip {
4497                        batch_offset += selector.row_count;
4498                        continue;
4499                    }
4500
4501                    assert_eq!(
4502                        batch.slice(batch_offset, selector.row_count),
4503                        actual.slice(actual_offset, selector.row_count)
4504                    );
4505
4506                    batch_offset += selector.row_count;
4507                    actual_offset += selector.row_count;
4508                }
4509            }
4510        }
4511    }
4512
4513    #[test]
4514    fn test_read_old_nested_list() {
4515        use arrow::datatypes::DataType;
4516        use arrow::datatypes::ToByteSlice;
4517
4518        let testdata = arrow::util::test_util::parquet_test_data();
4519        // message my_record {
4520        //     REQUIRED group a (LIST) {
4521        //         REPEATED group array (LIST) {
4522        //             REPEATED INT32 array;
4523        //         }
4524        //     }
4525        // }
4526        // should be read as list<list<int32>>
4527        let path = format!("{testdata}/old_list_structure.parquet");
4528        let test_file = File::open(path).unwrap();
4529
4530        // create expected ListArray
4531        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4532
4533        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4534        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4535
4536        // Construct a list array from the above two
4537        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4538            "array",
4539            DataType::Int32,
4540            false,
4541        ))))
4542        .len(2)
4543        .add_buffer(a_value_offsets)
4544        .add_child_data(a_values.into_data())
4545        .build()
4546        .unwrap();
4547        let a = ListArray::from(a_list_data);
4548
4549        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4550        let mut reader = builder.build().unwrap();
4551        let out = reader.next().unwrap().unwrap();
4552        assert_eq!(out.num_rows(), 1);
4553        assert_eq!(out.num_columns(), 1);
4554        // grab first column
4555        let c0 = out.column(0);
4556        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4557        // get first row: [[1, 2], [3, 4]]
4558        let r0 = c0arr.value(0);
4559        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4560        assert_eq!(r0arr, &a);
4561    }
4562
4563    #[test]
4564    fn test_map_no_value() {
4565        // File schema:
4566        // message schema {
4567        //   required group my_map (MAP) {
4568        //     repeated group key_value {
4569        //       required int32 key;
4570        //       optional int32 value;
4571        //     }
4572        //   }
4573        //   required group my_map_no_v (MAP) {
4574        //     repeated group key_value {
4575        //       required int32 key;
4576        //     }
4577        //   }
4578        //   required group my_list (LIST) {
4579        //     repeated group list {
4580        //       required int32 element;
4581        //     }
4582        //   }
4583        // }
4584        let testdata = arrow::util::test_util::parquet_test_data();
4585        let path = format!("{testdata}/map_no_value.parquet");
4586        let file = File::open(path).unwrap();
4587
4588        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4589            .unwrap()
4590            .build()
4591            .unwrap();
4592        let out = reader.next().unwrap().unwrap();
4593        assert_eq!(out.num_rows(), 3);
4594        assert_eq!(out.num_columns(), 3);
4595        // my_map_no_v and my_list columns should now be equivalent
4596        let c0 = out.column(1).as_list::<i32>();
4597        let c1 = out.column(2).as_list::<i32>();
4598        assert_eq!(c0.len(), c1.len());
4599        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4600    }
4601}