parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46/// Builder for constructing Parquet readers that decode into [Apache Arrow]
47/// arrays.
48///
49/// Most users should use one of the following specializations:
50///
51/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
52/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
53///
54/// # Features
55/// * Projection pushdown: [`Self::with_projection`]
56/// * Cached metadata: [`ArrowReaderMetadata::load`]
57/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
58/// * Row group filtering: [`Self::with_row_groups`]
59/// * Range filtering: [`Self::with_row_selection`]
60/// * Row level filtering: [`Self::with_row_filter`]
61///
62/// # Implementing Predicate Pushdown
63///
64/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
65/// process, which is efficient and allows the most low level optimizations.
66///
67/// However, most Parquet based systems will apply filters at many steps prior
68/// to decoding such as pruning files, row groups and data pages. This crate
69/// provides the low level APIs needed to implement such filtering, but does not
70/// include any logic to actually evaluate predicates. For example:
71///
72/// * [`Self::with_row_groups`] for Row Group pruning
73/// * [`Self::with_row_selection`] for data page pruning
74/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
75///
76/// The rationale for this design is that implementing predicate pushdown is a
77/// complex topic and varies significantly from system to system. For example
78///
79/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
80/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
81/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
82/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
83///
84/// You can read more about this design in the [Querying Parquet with
85/// Millisecond Latency] Arrow blog post.
86///
87/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
88/// [Apache Arrow]: https://arrow.apache.org/
89/// [`StatisticsConverter`]: statistics::StatisticsConverter
90/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
91pub struct ArrowReaderBuilder<T> {
92    pub(crate) input: T,
93
94    pub(crate) metadata: Arc<ParquetMetaData>,
95
96    pub(crate) schema: SchemaRef,
97
98    pub(crate) fields: Option<Arc<ParquetField>>,
99
100    pub(crate) batch_size: usize,
101
102    pub(crate) row_groups: Option<Vec<usize>>,
103
104    pub(crate) projection: ProjectionMask,
105
106    pub(crate) filter: Option<RowFilter>,
107
108    pub(crate) selection: Option<RowSelection>,
109
110    pub(crate) limit: Option<usize>,
111
112    pub(crate) offset: Option<usize>,
113}
114
115impl<T> ArrowReaderBuilder<T> {
116    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
117        Self {
118            input,
119            metadata: metadata.metadata,
120            schema: metadata.schema,
121            fields: metadata.fields,
122            batch_size: 1024,
123            row_groups: None,
124            projection: ProjectionMask::all(),
125            filter: None,
126            selection: None,
127            limit: None,
128            offset: None,
129        }
130    }
131
132    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
133    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
134        &self.metadata
135    }
136
137    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
138    pub fn parquet_schema(&self) -> &SchemaDescriptor {
139        self.metadata.file_metadata().schema_descr()
140    }
141
142    /// Returns the arrow [`SchemaRef`] for this parquet file
143    pub fn schema(&self) -> &SchemaRef {
144        &self.schema
145    }
146
147    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
148    /// If the batch_size more than the file row count, use the file row count.
149    pub fn with_batch_size(self, batch_size: usize) -> Self {
150        // Try to avoid allocate large buffer
151        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
152        Self { batch_size, ..self }
153    }
154
155    /// Only read data from the provided row group indexes
156    ///
157    /// This is also called row group filtering
158    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
159        Self {
160            row_groups: Some(row_groups),
161            ..self
162        }
163    }
164
165    /// Only read data from the provided column indexes
166    pub fn with_projection(self, mask: ProjectionMask) -> Self {
167        Self {
168            projection: mask,
169            ..self
170        }
171    }
172
173    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
174    /// data into memory.
175    ///
176    /// This feature is used to restrict which rows are decoded within row
177    /// groups, skipping ranges of rows that are not needed. Such selections
178    /// could be determined by evaluating predicates against the parquet page
179    /// [`Index`] or some other external information available to a query
180    /// engine.
181    ///
182    /// # Notes
183    ///
184    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
185    /// applying the row selection, and therefore rows from skipped row groups
186    /// should not be included in the [`RowSelection`] (see example below)
187    ///
188    /// It is recommended to enable writing the page index if using this
189    /// functionality, to allow more efficient skipping over data pages. See
190    /// [`ArrowReaderOptions::with_page_index`].
191    ///
192    /// # Example
193    ///
194    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
195    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
196    /// row group 3:
197    ///
198    /// ```text
199    ///   Row Group 0, 1000 rows (selected)
200    ///   Row Group 1, 1000 rows (skipped)
201    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
202    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
203    /// ```
204    ///
205    /// You could pass the following [`RowSelection`]:
206    ///
207    /// ```text
208    ///  Select 1000    (scan all rows in row group 0)
209    ///  Skip 50        (skip the first 50 rows in row group 2)
210    ///  Select 50      (scan rows 50-100 in row group 2)
211    ///  Skip 900       (skip the remaining rows in row group 2)
212    ///  Skip 200       (skip the first 200 rows in row group 3)
213    ///  Select 100     (scan rows 200-300 in row group 3)
214    ///  Skip 700       (skip the remaining rows in row group 3)
215    /// ```
216    /// Note there is no entry for the (entirely) skipped row group 1.
217    ///
218    /// Note you can represent the same selection with fewer entries. Instead of
219    ///
220    /// ```text
221    ///  Skip 900       (skip the remaining rows in row group 2)
222    ///  Skip 200       (skip the first 200 rows in row group 3)
223    /// ```
224    ///
225    /// you could use
226    ///
227    /// ```text
228    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
229    /// ```
230    ///
231    /// [`Index`]: crate::file::page_index::index::Index
232    pub fn with_row_selection(self, selection: RowSelection) -> Self {
233        Self {
234            selection: Some(selection),
235            ..self
236        }
237    }
238
239    /// Provide a [`RowFilter`] to skip decoding rows
240    ///
241    /// Row filters are applied after row group selection and row selection
242    ///
243    /// It is recommended to enable reading the page index if using this functionality, to allow
244    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
245    pub fn with_row_filter(self, filter: RowFilter) -> Self {
246        Self {
247            filter: Some(filter),
248            ..self
249        }
250    }
251
252    /// Provide a limit to the number of rows to be read
253    ///
254    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
255    /// allowing it to limit the final set of rows decoded after any pushed down predicates
256    ///
257    /// It is recommended to enable reading the page index if using this functionality, to allow
258    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
259    pub fn with_limit(self, limit: usize) -> Self {
260        Self {
261            limit: Some(limit),
262            ..self
263        }
264    }
265
266    /// Provide an offset to skip over the given number of rows
267    ///
268    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
269    /// allowing it to skip rows after any pushed down predicates
270    ///
271    /// It is recommended to enable reading the page index if using this functionality, to allow
272    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
273    pub fn with_offset(self, offset: usize) -> Self {
274        Self {
275            offset: Some(offset),
276            ..self
277        }
278    }
279}
280
281/// Options that control how metadata is read for a parquet file
282///
283/// See [`ArrowReaderBuilder`] for how to configure how the column data
284/// is then read from the file, including projection and filter pushdown
285#[derive(Debug, Clone, Default)]
286pub struct ArrowReaderOptions {
287    /// Should the reader strip any user defined metadata from the Arrow schema
288    skip_arrow_metadata: bool,
289    /// If provided used as the schema for the file, otherwise the schema is read from the file
290    supplied_schema: Option<SchemaRef>,
291    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
292    pub(crate) page_index: bool,
293    /// If encryption is enabled, the file decryption properties can be provided
294    #[cfg(feature = "encryption")]
295    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
296}
297
298impl ArrowReaderOptions {
299    /// Create a new [`ArrowReaderOptions`] with the default settings
300    pub fn new() -> Self {
301        Self::default()
302    }
303
304    /// Skip decoding the embedded arrow metadata (defaults to `false`)
305    ///
306    /// Parquet files generated by some writers may contain embedded arrow
307    /// schema and metadata.
308    /// This may not be correct or compatible with your system,
309    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
310    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
311        Self {
312            skip_arrow_metadata,
313            ..self
314        }
315    }
316
317    /// Provide a schema to use when reading the parquet file. If provided it
318    /// takes precedence over the schema inferred from the file or the schema defined
319    /// in the file's metadata. If the schema is not compatible with the file's
320    /// schema an error will be returned when constructing the builder.
321    ///
322    /// This option is only required if you want to cast columns to a different type.
323    /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp
324    /// in the Arrow schema.
325    ///
326    /// The supplied schema must have the same number of columns as the parquet schema and
327    /// the column names need to be the same.
328    ///
329    /// # Example
330    /// ```
331    /// use std::io::Bytes;
332    /// use std::sync::Arc;
333    /// use tempfile::tempfile;
334    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
335    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
336    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
337    /// use parquet::arrow::ArrowWriter;
338    ///
339    /// // Write data - schema is inferred from the data to be Int32
340    /// let file = tempfile().unwrap();
341    /// let batch = RecordBatch::try_from_iter(vec![
342    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
343    /// ]).unwrap();
344    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
345    /// writer.write(&batch).unwrap();
346    /// writer.close().unwrap();
347    ///
348    /// // Read the file back.
349    /// // Supply a schema that interprets the Int32 column as a Timestamp.
350    /// let supplied_schema = Arc::new(Schema::new(vec![
351    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
352    /// ]));
353    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
354    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
355    ///     file.try_clone().unwrap(),
356    ///     options
357    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
358    ///
359    /// // Create the reader and read the data using the supplied schema.
360    /// let mut reader = builder.build().unwrap();
361    /// let _batch = reader.next().unwrap().unwrap();
362    /// ```
363    pub fn with_schema(self, schema: SchemaRef) -> Self {
364        Self {
365            supplied_schema: Some(schema),
366            skip_arrow_metadata: true,
367            ..self
368        }
369    }
370
371    /// Enable reading [`PageIndex`], if present (defaults to `false`)
372    ///
373    /// The `PageIndex` can be used to push down predicates to the parquet scan,
374    /// potentially eliminating unnecessary IO, by some query engines.
375    ///
376    /// If this is enabled, [`ParquetMetaData::column_index`] and
377    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
378    /// information is present in the file.
379    ///
380    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
381    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
382    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
383    pub fn with_page_index(self, page_index: bool) -> Self {
384        Self { page_index, ..self }
385    }
386
387    /// Provide the file decryption properties to use when reading encrypted parquet files.
388    ///
389    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
390    #[cfg(feature = "encryption")]
391    pub fn with_file_decryption_properties(
392        self,
393        file_decryption_properties: FileDecryptionProperties,
394    ) -> Self {
395        Self {
396            file_decryption_properties: Some(file_decryption_properties),
397            ..self
398        }
399    }
400
401    /// Retrieve the currently set page index behavior.
402    ///
403    /// This can be set via [`with_page_index`][Self::with_page_index].
404    pub fn page_index(&self) -> bool {
405        self.page_index
406    }
407
408    /// Retrieve the currently set file decryption properties.
409    ///
410    /// This can be set via
411    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
412    #[cfg(feature = "encryption")]
413    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
414        self.file_decryption_properties.as_ref()
415    }
416}
417
418/// The metadata necessary to construct a [`ArrowReaderBuilder`]
419///
420/// Note this structure is cheaply clone-able as it consists of several arcs.
421///
422/// This structure allows
423///
424/// 1. Loading metadata for a file once and then using that same metadata to
425///    construct multiple separate readers, for example, to distribute readers
426///    across multiple threads
427///
428/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
429///    from the file each time a reader is constructed.
430///
431/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
432#[derive(Debug, Clone)]
433pub struct ArrowReaderMetadata {
434    /// The Parquet Metadata, if known aprior
435    pub(crate) metadata: Arc<ParquetMetaData>,
436    /// The Arrow Schema
437    pub(crate) schema: SchemaRef,
438
439    pub(crate) fields: Option<Arc<ParquetField>>,
440}
441
442impl ArrowReaderMetadata {
443    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
444    ///
445    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
446    /// example of how this can be used
447    ///
448    /// # Notes
449    ///
450    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
451    /// `Self::metadata` is missing the page index, this function will attempt
452    /// to load the page index by making an object store request.
453    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
454        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
455        #[cfg(feature = "encryption")]
456        let metadata =
457            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
458        let metadata = metadata.parse_and_finish(reader)?;
459        Self::try_new(Arc::new(metadata), options)
460    }
461
462    /// Create a new [`ArrowReaderMetadata`]
463    ///
464    /// # Notes
465    ///
466    /// This function does not attempt to load the PageIndex if not present in the metadata.
467    /// See [`Self::load`] for more details.
468    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
469        match options.supplied_schema {
470            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
471            None => {
472                let kv_metadata = match options.skip_arrow_metadata {
473                    true => None,
474                    false => metadata.file_metadata().key_value_metadata(),
475                };
476
477                let (schema, fields) = parquet_to_arrow_schema_and_fields(
478                    metadata.file_metadata().schema_descr(),
479                    ProjectionMask::all(),
480                    kv_metadata,
481                )?;
482
483                Ok(Self {
484                    metadata,
485                    schema: Arc::new(schema),
486                    fields: fields.map(Arc::new),
487                })
488            }
489        }
490    }
491
492    fn with_supplied_schema(
493        metadata: Arc<ParquetMetaData>,
494        supplied_schema: SchemaRef,
495    ) -> Result<Self> {
496        let parquet_schema = metadata.file_metadata().schema_descr();
497        let field_levels = parquet_to_arrow_field_levels(
498            parquet_schema,
499            ProjectionMask::all(),
500            Some(supplied_schema.fields()),
501        )?;
502        let fields = field_levels.fields;
503        let inferred_len = fields.len();
504        let supplied_len = supplied_schema.fields().len();
505        // Ensure the supplied schema has the same number of columns as the parquet schema.
506        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
507        // different lengths, but we check here to be safe.
508        if inferred_len != supplied_len {
509            Err(arrow_err!(format!(
510                "incompatible arrow schema, expected {} columns received {}",
511                inferred_len, supplied_len
512            )))
513        } else {
514            let diff_fields: Vec<_> = supplied_schema
515                .fields()
516                .iter()
517                .zip(fields.iter())
518                .filter_map(|(field1, field2)| {
519                    if field1 != field2 {
520                        Some(field1.name().clone())
521                    } else {
522                        None
523                    }
524                })
525                .collect();
526
527            if !diff_fields.is_empty() {
528                Err(ParquetError::ArrowError(format!(
529                    "incompatible arrow schema, the following fields could not be cast: [{}]",
530                    diff_fields.join(", ")
531                )))
532            } else {
533                Ok(Self {
534                    metadata,
535                    schema: supplied_schema,
536                    fields: field_levels.levels.map(Arc::new),
537                })
538            }
539        }
540    }
541
542    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
543    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
544        &self.metadata
545    }
546
547    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
548    pub fn parquet_schema(&self) -> &SchemaDescriptor {
549        self.metadata.file_metadata().schema_descr()
550    }
551
552    /// Returns the arrow [`SchemaRef`] for this parquet file
553    pub fn schema(&self) -> &SchemaRef {
554        &self.schema
555    }
556}
557
558#[doc(hidden)]
559/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
560pub struct SyncReader<T: ChunkReader>(T);
561
562/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
563///
564/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
565///
566/// See [`ArrowReaderBuilder`] for additional member functions
567pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
568
569impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
570    /// Create a new [`ParquetRecordBatchReaderBuilder`]
571    ///
572    /// ```
573    /// # use std::sync::Arc;
574    /// # use bytes::Bytes;
575    /// # use arrow_array::{Int32Array, RecordBatch};
576    /// # use arrow_schema::{DataType, Field, Schema};
577    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
578    /// # use parquet::arrow::ArrowWriter;
579    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
580    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
581    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
582    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
583    /// # writer.write(&batch).unwrap();
584    /// # writer.close().unwrap();
585    /// # let file = Bytes::from(file);
586    /// #
587    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
588    ///
589    /// // Inspect metadata
590    /// assert_eq!(builder.metadata().num_row_groups(), 1);
591    ///
592    /// // Construct reader
593    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
594    ///
595    /// // Read data
596    /// let _batch = reader.next().unwrap().unwrap();
597    /// ```
598    pub fn try_new(reader: T) -> Result<Self> {
599        Self::try_new_with_options(reader, Default::default())
600    }
601
602    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
603    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
604        let metadata = ArrowReaderMetadata::load(&reader, options)?;
605        Ok(Self::new_with_metadata(reader, metadata))
606    }
607
608    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
609    ///
610    /// This interface allows:
611    ///
612    /// 1. Loading metadata once and using it to create multiple builders with
613    ///    potentially different settings or run on different threads
614    ///
615    /// 2. Using a cached copy of the metadata rather than re-reading it from the
616    ///    file each time a reader is constructed.
617    ///
618    /// See the docs on [`ArrowReaderMetadata`] for more details
619    ///
620    /// # Example
621    /// ```
622    /// # use std::fs::metadata;
623    /// # use std::sync::Arc;
624    /// # use bytes::Bytes;
625    /// # use arrow_array::{Int32Array, RecordBatch};
626    /// # use arrow_schema::{DataType, Field, Schema};
627    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
628    /// # use parquet::arrow::ArrowWriter;
629    /// #
630    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
631    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
632    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
633    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
634    /// # writer.write(&batch).unwrap();
635    /// # writer.close().unwrap();
636    /// # let file = Bytes::from(file);
637    /// #
638    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
639    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
640    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
641    ///
642    /// // Should be able to read from both in parallel
643    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
644    /// ```
645    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
646        Self::new_builder(SyncReader(input), metadata)
647    }
648
649    /// Build a [`ParquetRecordBatchReader`]
650    ///
651    /// Note: this will eagerly evaluate any `RowFilter` before returning
652    pub fn build(self) -> Result<ParquetRecordBatchReader> {
653        // Try to avoid allocate large buffer
654        let batch_size = self
655            .batch_size
656            .min(self.metadata.file_metadata().num_rows() as usize);
657
658        let row_groups = self
659            .row_groups
660            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
661
662        let reader = ReaderRowGroups {
663            reader: Arc::new(self.input.0),
664            metadata: self.metadata,
665            row_groups,
666        };
667
668        let mut filter = self.filter;
669        let mut selection = self.selection;
670
671        if let Some(filter) = filter.as_mut() {
672            for predicate in filter.predicates.iter_mut() {
673                if !selects_any(selection.as_ref()) {
674                    break;
675                }
676
677                let array_reader =
678                    build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
679
680                selection = Some(evaluate_predicate(
681                    batch_size,
682                    array_reader,
683                    selection,
684                    predicate.as_mut(),
685                )?);
686            }
687        }
688
689        let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
690
691        // If selection is empty, truncate
692        if !selects_any(selection.as_ref()) {
693            selection = Some(RowSelection::from(vec![]));
694        }
695
696        Ok(ParquetRecordBatchReader::new(
697            batch_size,
698            array_reader,
699            apply_range(selection, reader.num_rows(), self.offset, self.limit),
700        ))
701    }
702}
703
704struct ReaderRowGroups<T: ChunkReader> {
705    reader: Arc<T>,
706
707    metadata: Arc<ParquetMetaData>,
708    /// Optional list of row group indices to scan
709    row_groups: Vec<usize>,
710}
711
712impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
713    fn num_rows(&self) -> usize {
714        let meta = self.metadata.row_groups();
715        self.row_groups
716            .iter()
717            .map(|x| meta[*x].num_rows() as usize)
718            .sum()
719    }
720
721    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
722        Ok(Box::new(ReaderPageIterator {
723            column_idx: i,
724            reader: self.reader.clone(),
725            metadata: self.metadata.clone(),
726            row_groups: self.row_groups.clone().into_iter(),
727        }))
728    }
729}
730
731struct ReaderPageIterator<T: ChunkReader> {
732    reader: Arc<T>,
733    column_idx: usize,
734    row_groups: std::vec::IntoIter<usize>,
735    metadata: Arc<ParquetMetaData>,
736}
737
738impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
739    /// Return the next SerializedPageReader
740    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
741        let rg = self.metadata.row_group(rg_idx);
742        let column_chunk_metadata = rg.column(self.column_idx);
743        let offset_index = self.metadata.offset_index();
744        // `offset_index` may not exist and `i[rg_idx]` will be empty.
745        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
746        let page_locations = offset_index
747            .filter(|i| !i[rg_idx].is_empty())
748            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
749        let total_rows = rg.num_rows() as usize;
750        let reader = self.reader.clone();
751
752        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
753            .add_crypto_context(
754                rg_idx,
755                self.column_idx,
756                self.metadata.as_ref(),
757                column_chunk_metadata,
758            )
759    }
760}
761
762impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
763    type Item = Result<Box<dyn PageReader>>;
764
765    fn next(&mut self) -> Option<Self::Item> {
766        let rg_idx = self.row_groups.next()?;
767        let page_reader = self
768            .next_page_reader(rg_idx)
769            .map(|page_reader| Box::new(page_reader) as _);
770        Some(page_reader)
771    }
772}
773
774impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
775
776/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
777/// read from a parquet data source
778pub struct ParquetRecordBatchReader {
779    batch_size: usize,
780    array_reader: Box<dyn ArrayReader>,
781    schema: SchemaRef,
782    selection: Option<VecDeque<RowSelector>>,
783}
784
785impl Iterator for ParquetRecordBatchReader {
786    type Item = Result<RecordBatch, ArrowError>;
787
788    fn next(&mut self) -> Option<Self::Item> {
789        let mut read_records = 0;
790        match self.selection.as_mut() {
791            Some(selection) => {
792                while read_records < self.batch_size && !selection.is_empty() {
793                    let front = selection.pop_front().unwrap();
794                    if front.skip {
795                        let skipped = match self.array_reader.skip_records(front.row_count) {
796                            Ok(skipped) => skipped,
797                            Err(e) => return Some(Err(e.into())),
798                        };
799
800                        if skipped != front.row_count {
801                            return Some(Err(general_err!(
802                                "failed to skip rows, expected {}, got {}",
803                                front.row_count,
804                                skipped
805                            )
806                            .into()));
807                        }
808                        continue;
809                    }
810
811                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
812                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
813                    if front.row_count == 0 {
814                        continue;
815                    }
816
817                    // try to read record
818                    let need_read = self.batch_size - read_records;
819                    let to_read = match front.row_count.checked_sub(need_read) {
820                        Some(remaining) if remaining != 0 => {
821                            // if page row count less than batch_size we must set batch size to page row count.
822                            // add check avoid dead loop
823                            selection.push_front(RowSelector::select(remaining));
824                            need_read
825                        }
826                        _ => front.row_count,
827                    };
828                    match self.array_reader.read_records(to_read) {
829                        Ok(0) => break,
830                        Ok(rec) => read_records += rec,
831                        Err(error) => return Some(Err(error.into())),
832                    }
833                }
834            }
835            None => {
836                if let Err(error) = self.array_reader.read_records(self.batch_size) {
837                    return Some(Err(error.into()));
838                }
839            }
840        };
841
842        match self.array_reader.consume_batch() {
843            Err(error) => Some(Err(error.into())),
844            Ok(array) => {
845                let struct_array = array.as_struct_opt().ok_or_else(|| {
846                    ArrowError::ParquetError(
847                        "Struct array reader should return struct array".to_string(),
848                    )
849                });
850
851                match struct_array {
852                    Err(err) => Some(Err(err)),
853                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
854                }
855            }
856        }
857    }
858}
859
860impl RecordBatchReader for ParquetRecordBatchReader {
861    /// Returns the projected [`SchemaRef`] for reading the parquet file.
862    ///
863    /// Note that the schema metadata will be stripped here. See
864    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
865    fn schema(&self) -> SchemaRef {
866        self.schema.clone()
867    }
868}
869
870impl ParquetRecordBatchReader {
871    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
872    ///
873    /// See [`ParquetRecordBatchReaderBuilder`] for more options
874    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
875        ParquetRecordBatchReaderBuilder::try_new(reader)?
876            .with_batch_size(batch_size)
877            .build()
878    }
879
880    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
881    ///
882    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
883    /// higher-level interface for reading parquet data from a file
884    pub fn try_new_with_row_groups(
885        levels: &FieldLevels,
886        row_groups: &dyn RowGroups,
887        batch_size: usize,
888        selection: Option<RowSelection>,
889    ) -> Result<Self> {
890        let array_reader =
891            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
892
893        Ok(Self {
894            batch_size,
895            array_reader,
896            schema: Arc::new(Schema::new(levels.fields.clone())),
897            selection: selection.map(|s| s.trim().into()),
898        })
899    }
900
901    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
902    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
903    /// all rows will be returned
904    pub(crate) fn new(
905        batch_size: usize,
906        array_reader: Box<dyn ArrayReader>,
907        selection: Option<RowSelection>,
908    ) -> Self {
909        let schema = match array_reader.get_data_type() {
910            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
911            _ => unreachable!("Struct array reader's data type is not struct!"),
912        };
913
914        Self {
915            batch_size,
916            array_reader,
917            schema: Arc::new(schema),
918            selection: selection.map(|s| s.trim().into()),
919        }
920    }
921}
922
923/// Returns `true` if `selection` is `None` or selects some rows
924pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
925    selection.map(|x| x.selects_any()).unwrap_or(true)
926}
927
928/// Applies an optional offset and limit to an optional [`RowSelection`]
929pub(crate) fn apply_range(
930    mut selection: Option<RowSelection>,
931    row_count: usize,
932    offset: Option<usize>,
933    limit: Option<usize>,
934) -> Option<RowSelection> {
935    // If an offset is defined, apply it to the `selection`
936    if let Some(offset) = offset {
937        selection = Some(match row_count.checked_sub(offset) {
938            None => RowSelection::from(vec![]),
939            Some(remaining) => selection
940                .map(|selection| selection.offset(offset))
941                .unwrap_or_else(|| {
942                    RowSelection::from(vec![
943                        RowSelector::skip(offset),
944                        RowSelector::select(remaining),
945                    ])
946                }),
947        });
948    }
949
950    // If a limit is defined, apply it to the final `selection`
951    if let Some(limit) = limit {
952        selection = Some(
953            selection
954                .map(|selection| selection.limit(limit))
955                .unwrap_or_else(|| {
956                    RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
957                }),
958        );
959    }
960    selection
961}
962
963/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
964/// which rows to return.
965///
966/// `input_selection`: Optional pre-existing selection. If `Some`, then the
967/// final [`RowSelection`] will be the conjunction of it and the rows selected
968/// by `predicate`.
969///
970/// Note: A pre-existing selection may come from evaluating a previous predicate
971/// or if the [`ParquetRecordBatchReader`] specified an explicit
972/// [`RowSelection`] in addition to one or more predicates.
973pub(crate) fn evaluate_predicate(
974    batch_size: usize,
975    array_reader: Box<dyn ArrayReader>,
976    input_selection: Option<RowSelection>,
977    predicate: &mut dyn ArrowPredicate,
978) -> Result<RowSelection> {
979    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
980    let mut filters = vec![];
981    for maybe_batch in reader {
982        let maybe_batch = maybe_batch?;
983        let input_rows = maybe_batch.num_rows();
984        let filter = predicate.evaluate(maybe_batch)?;
985        // Since user supplied predicate, check error here to catch bugs quickly
986        if filter.len() != input_rows {
987            return Err(arrow_err!(
988                "ArrowPredicate predicate returned {} rows, expected {input_rows}",
989                filter.len()
990            ));
991        }
992        match filter.null_count() {
993            0 => filters.push(filter),
994            _ => filters.push(prep_null_mask_filter(&filter)),
995        };
996    }
997
998    let raw = RowSelection::from_filters(&filters);
999    Ok(match input_selection {
1000        Some(selection) => selection.and_then(&raw),
1001        None => raw,
1002    })
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use std::cmp::min;
1008    use std::collections::{HashMap, VecDeque};
1009    use std::fmt::Formatter;
1010    use std::fs::File;
1011    use std::io::Seek;
1012    use std::path::PathBuf;
1013    use std::sync::Arc;
1014
1015    use arrow_array::builder::*;
1016    use arrow_array::cast::AsArray;
1017    use arrow_array::types::{
1018        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
1019        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
1020    };
1021    use arrow_array::*;
1022    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1023    use arrow_data::{ArrayData, ArrayDataBuilder};
1024    use arrow_schema::{
1025        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1026    };
1027    use arrow_select::concat::concat_batches;
1028    use bytes::Bytes;
1029    use half::f16;
1030    use num::PrimInt;
1031    use rand::{rng, Rng, RngCore};
1032    use tempfile::tempfile;
1033
1034    use crate::arrow::arrow_reader::{
1035        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1036        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1037    };
1038    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1039    use crate::arrow::{ArrowWriter, ProjectionMask};
1040    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1041    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1042    use crate::data_type::{
1043        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1044        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1045    };
1046    use crate::errors::Result;
1047    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1048    use crate::file::writer::SerializedFileWriter;
1049    use crate::schema::parser::parse_message_type;
1050    use crate::schema::types::{Type, TypePtr};
1051    use crate::util::test_common::rand_gen::RandGen;
1052
1053    #[test]
1054    fn test_arrow_reader_all_columns() {
1055        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1056
1057        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1058        let original_schema = Arc::clone(builder.schema());
1059        let reader = builder.build().unwrap();
1060
1061        // Verify that the schema was correctly parsed
1062        assert_eq!(original_schema.fields(), reader.schema().fields());
1063    }
1064
1065    #[test]
1066    fn test_arrow_reader_single_column() {
1067        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1068
1069        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1070        let original_schema = Arc::clone(builder.schema());
1071
1072        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1073        let reader = builder.with_projection(mask).build().unwrap();
1074
1075        // Verify that the schema was correctly parsed
1076        assert_eq!(1, reader.schema().fields().len());
1077        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1078    }
1079
1080    #[test]
1081    fn test_arrow_reader_single_column_by_name() {
1082        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1083
1084        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1085        let original_schema = Arc::clone(builder.schema());
1086
1087        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1088        let reader = builder.with_projection(mask).build().unwrap();
1089
1090        // Verify that the schema was correctly parsed
1091        assert_eq!(1, reader.schema().fields().len());
1092        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1093    }
1094
1095    #[test]
1096    fn test_null_column_reader_test() {
1097        let mut file = tempfile::tempfile().unwrap();
1098
1099        let schema = "
1100            message message {
1101                OPTIONAL INT32 int32;
1102            }
1103        ";
1104        let schema = Arc::new(parse_message_type(schema).unwrap());
1105
1106        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1107        generate_single_column_file_with_data::<Int32Type>(
1108            &[vec![], vec![]],
1109            Some(&def_levels),
1110            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1111            schema,
1112            Some(Field::new("int32", ArrowDataType::Null, true)),
1113            &Default::default(),
1114        )
1115        .unwrap();
1116
1117        file.rewind().unwrap();
1118
1119        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1120        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1121
1122        assert_eq!(batches.len(), 4);
1123        for batch in &batches[0..3] {
1124            assert_eq!(batch.num_rows(), 2);
1125            assert_eq!(batch.num_columns(), 1);
1126            assert_eq!(batch.column(0).null_count(), 2);
1127        }
1128
1129        assert_eq!(batches[3].num_rows(), 1);
1130        assert_eq!(batches[3].num_columns(), 1);
1131        assert_eq!(batches[3].column(0).null_count(), 1);
1132    }
1133
1134    #[test]
1135    fn test_primitive_single_column_reader_test() {
1136        run_single_column_reader_tests::<BoolType, _, BoolType>(
1137            2,
1138            ConvertedType::NONE,
1139            None,
1140            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1141            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1142        );
1143        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1144            2,
1145            ConvertedType::NONE,
1146            None,
1147            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1148            &[
1149                Encoding::PLAIN,
1150                Encoding::RLE_DICTIONARY,
1151                Encoding::DELTA_BINARY_PACKED,
1152                Encoding::BYTE_STREAM_SPLIT,
1153            ],
1154        );
1155        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1156            2,
1157            ConvertedType::NONE,
1158            None,
1159            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1160            &[
1161                Encoding::PLAIN,
1162                Encoding::RLE_DICTIONARY,
1163                Encoding::DELTA_BINARY_PACKED,
1164                Encoding::BYTE_STREAM_SPLIT,
1165            ],
1166        );
1167        run_single_column_reader_tests::<FloatType, _, FloatType>(
1168            2,
1169            ConvertedType::NONE,
1170            None,
1171            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1172            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1173        );
1174    }
1175
1176    #[test]
1177    fn test_unsigned_primitive_single_column_reader_test() {
1178        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1179            2,
1180            ConvertedType::UINT_32,
1181            Some(ArrowDataType::UInt32),
1182            |vals| {
1183                Arc::new(UInt32Array::from_iter(
1184                    vals.iter().map(|x| x.map(|x| x as u32)),
1185                ))
1186            },
1187            &[
1188                Encoding::PLAIN,
1189                Encoding::RLE_DICTIONARY,
1190                Encoding::DELTA_BINARY_PACKED,
1191            ],
1192        );
1193        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1194            2,
1195            ConvertedType::UINT_64,
1196            Some(ArrowDataType::UInt64),
1197            |vals| {
1198                Arc::new(UInt64Array::from_iter(
1199                    vals.iter().map(|x| x.map(|x| x as u64)),
1200                ))
1201            },
1202            &[
1203                Encoding::PLAIN,
1204                Encoding::RLE_DICTIONARY,
1205                Encoding::DELTA_BINARY_PACKED,
1206            ],
1207        );
1208    }
1209
1210    #[test]
1211    fn test_unsigned_roundtrip() {
1212        let schema = Arc::new(Schema::new(vec![
1213            Field::new("uint32", ArrowDataType::UInt32, true),
1214            Field::new("uint64", ArrowDataType::UInt64, true),
1215        ]));
1216
1217        let mut buf = Vec::with_capacity(1024);
1218        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1219
1220        let original = RecordBatch::try_new(
1221            schema,
1222            vec![
1223                Arc::new(UInt32Array::from_iter_values([
1224                    0,
1225                    i32::MAX as u32,
1226                    u32::MAX,
1227                ])),
1228                Arc::new(UInt64Array::from_iter_values([
1229                    0,
1230                    i64::MAX as u64,
1231                    u64::MAX,
1232                ])),
1233            ],
1234        )
1235        .unwrap();
1236
1237        writer.write(&original).unwrap();
1238        writer.close().unwrap();
1239
1240        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1241        let ret = reader.next().unwrap().unwrap();
1242        assert_eq!(ret, original);
1243
1244        // Check they can be downcast to the correct type
1245        ret.column(0)
1246            .as_any()
1247            .downcast_ref::<UInt32Array>()
1248            .unwrap();
1249
1250        ret.column(1)
1251            .as_any()
1252            .downcast_ref::<UInt64Array>()
1253            .unwrap();
1254    }
1255
1256    #[test]
1257    fn test_float16_roundtrip() -> Result<()> {
1258        let schema = Arc::new(Schema::new(vec![
1259            Field::new("float16", ArrowDataType::Float16, false),
1260            Field::new("float16-nullable", ArrowDataType::Float16, true),
1261        ]));
1262
1263        let mut buf = Vec::with_capacity(1024);
1264        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1265
1266        let original = RecordBatch::try_new(
1267            schema,
1268            vec![
1269                Arc::new(Float16Array::from_iter_values([
1270                    f16::EPSILON,
1271                    f16::MIN,
1272                    f16::MAX,
1273                    f16::NAN,
1274                    f16::INFINITY,
1275                    f16::NEG_INFINITY,
1276                    f16::ONE,
1277                    f16::NEG_ONE,
1278                    f16::ZERO,
1279                    f16::NEG_ZERO,
1280                    f16::E,
1281                    f16::PI,
1282                    f16::FRAC_1_PI,
1283                ])),
1284                Arc::new(Float16Array::from(vec![
1285                    None,
1286                    None,
1287                    None,
1288                    Some(f16::NAN),
1289                    Some(f16::INFINITY),
1290                    Some(f16::NEG_INFINITY),
1291                    None,
1292                    None,
1293                    None,
1294                    None,
1295                    None,
1296                    None,
1297                    Some(f16::FRAC_1_PI),
1298                ])),
1299            ],
1300        )?;
1301
1302        writer.write(&original)?;
1303        writer.close()?;
1304
1305        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1306        let ret = reader.next().unwrap()?;
1307        assert_eq!(ret, original);
1308
1309        // Ensure can be downcast to the correct type
1310        ret.column(0).as_primitive::<Float16Type>();
1311        ret.column(1).as_primitive::<Float16Type>();
1312
1313        Ok(())
1314    }
1315
1316    #[test]
1317    fn test_time_utc_roundtrip() -> Result<()> {
1318        let schema = Arc::new(Schema::new(vec![
1319            Field::new(
1320                "time_millis",
1321                ArrowDataType::Time32(TimeUnit::Millisecond),
1322                true,
1323            )
1324            .with_metadata(HashMap::from_iter(vec![(
1325                "adjusted_to_utc".to_string(),
1326                "".to_string(),
1327            )])),
1328            Field::new(
1329                "time_micros",
1330                ArrowDataType::Time64(TimeUnit::Microsecond),
1331                true,
1332            )
1333            .with_metadata(HashMap::from_iter(vec![(
1334                "adjusted_to_utc".to_string(),
1335                "".to_string(),
1336            )])),
1337        ]));
1338
1339        let mut buf = Vec::with_capacity(1024);
1340        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1341
1342        let original = RecordBatch::try_new(
1343            schema,
1344            vec![
1345                Arc::new(Time32MillisecondArray::from(vec![
1346                    Some(-1),
1347                    Some(0),
1348                    Some(86_399_000),
1349                    Some(86_400_000),
1350                    Some(86_401_000),
1351                    None,
1352                ])),
1353                Arc::new(Time64MicrosecondArray::from(vec![
1354                    Some(-1),
1355                    Some(0),
1356                    Some(86_399 * 1_000_000),
1357                    Some(86_400 * 1_000_000),
1358                    Some(86_401 * 1_000_000),
1359                    None,
1360                ])),
1361            ],
1362        )?;
1363
1364        writer.write(&original)?;
1365        writer.close()?;
1366
1367        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1368        let ret = reader.next().unwrap()?;
1369        assert_eq!(ret, original);
1370
1371        // Ensure can be downcast to the correct type
1372        ret.column(0).as_primitive::<Time32MillisecondType>();
1373        ret.column(1).as_primitive::<Time64MicrosecondType>();
1374
1375        Ok(())
1376    }
1377
1378    #[test]
1379    fn test_date32_roundtrip() -> Result<()> {
1380        use arrow_array::Date32Array;
1381
1382        let schema = Arc::new(Schema::new(vec![Field::new(
1383            "date32",
1384            ArrowDataType::Date32,
1385            false,
1386        )]));
1387
1388        let mut buf = Vec::with_capacity(1024);
1389
1390        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1391
1392        let original = RecordBatch::try_new(
1393            schema,
1394            vec![Arc::new(Date32Array::from(vec![
1395                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1396            ]))],
1397        )?;
1398
1399        writer.write(&original)?;
1400        writer.close()?;
1401
1402        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1403        let ret = reader.next().unwrap()?;
1404        assert_eq!(ret, original);
1405
1406        // Ensure can be downcast to the correct type
1407        ret.column(0).as_primitive::<Date32Type>();
1408
1409        Ok(())
1410    }
1411
1412    #[test]
1413    fn test_date64_roundtrip() -> Result<()> {
1414        use arrow_array::Date64Array;
1415
1416        let schema = Arc::new(Schema::new(vec![
1417            Field::new("small-date64", ArrowDataType::Date64, false),
1418            Field::new("big-date64", ArrowDataType::Date64, false),
1419            Field::new("invalid-date64", ArrowDataType::Date64, false),
1420        ]));
1421
1422        let mut default_buf = Vec::with_capacity(1024);
1423        let mut coerce_buf = Vec::with_capacity(1024);
1424
1425        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1426
1427        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1428        let mut coerce_writer =
1429            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1430
1431        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1432
1433        let original = RecordBatch::try_new(
1434            schema,
1435            vec![
1436                // small-date64
1437                Arc::new(Date64Array::from(vec![
1438                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1439                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1440                    0,
1441                    1_000 * NUM_MILLISECONDS_IN_DAY,
1442                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1443                ])),
1444                // big-date64
1445                Arc::new(Date64Array::from(vec![
1446                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1447                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1448                    0,
1449                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1450                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1451                ])),
1452                // invalid-date64
1453                Arc::new(Date64Array::from(vec![
1454                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1455                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1456                    1,
1457                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1458                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1459                ])),
1460            ],
1461        )?;
1462
1463        default_writer.write(&original)?;
1464        coerce_writer.write(&original)?;
1465
1466        default_writer.close()?;
1467        coerce_writer.close()?;
1468
1469        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1470        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1471
1472        let default_ret = default_reader.next().unwrap()?;
1473        let coerce_ret = coerce_reader.next().unwrap()?;
1474
1475        // Roundtrip should be successful when default writer used
1476        assert_eq!(default_ret, original);
1477
1478        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1479        assert_eq!(coerce_ret.column(0), original.column(0));
1480        assert_ne!(coerce_ret.column(1), original.column(1));
1481        assert_ne!(coerce_ret.column(2), original.column(2));
1482
1483        // Ensure both can be downcast to the correct type
1484        default_ret.column(0).as_primitive::<Date64Type>();
1485        coerce_ret.column(0).as_primitive::<Date64Type>();
1486
1487        Ok(())
1488    }
1489    struct RandFixedLenGen {}
1490
1491    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1492        fn gen(len: i32) -> FixedLenByteArray {
1493            let mut v = vec![0u8; len as usize];
1494            rng().fill_bytes(&mut v);
1495            ByteArray::from(v).into()
1496        }
1497    }
1498
1499    #[test]
1500    fn test_fixed_length_binary_column_reader() {
1501        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1502            20,
1503            ConvertedType::NONE,
1504            None,
1505            |vals| {
1506                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1507                for val in vals {
1508                    match val {
1509                        Some(b) => builder.append_value(b).unwrap(),
1510                        None => builder.append_null(),
1511                    }
1512                }
1513                Arc::new(builder.finish())
1514            },
1515            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1516        );
1517    }
1518
1519    #[test]
1520    fn test_interval_day_time_column_reader() {
1521        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1522            12,
1523            ConvertedType::INTERVAL,
1524            None,
1525            |vals| {
1526                Arc::new(
1527                    vals.iter()
1528                        .map(|x| {
1529                            x.as_ref().map(|b| IntervalDayTime {
1530                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1531                                milliseconds: i32::from_le_bytes(
1532                                    b.as_ref()[8..12].try_into().unwrap(),
1533                                ),
1534                            })
1535                        })
1536                        .collect::<IntervalDayTimeArray>(),
1537                )
1538            },
1539            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1540        );
1541    }
1542
1543    #[test]
1544    fn test_int96_single_column_reader_test() {
1545        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1546
1547        type TypeHintAndConversionFunction =
1548            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1549
1550        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1551            // Test without a specified ArrowType hint.
1552            (None, |vals: &[Option<Int96>]| {
1553                Arc::new(TimestampNanosecondArray::from_iter(
1554                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1555                )) as ArrayRef
1556            }),
1557            // Test other TimeUnits as ArrowType hints.
1558            (
1559                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1560                |vals: &[Option<Int96>]| {
1561                    Arc::new(TimestampSecondArray::from_iter(
1562                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1563                    )) as ArrayRef
1564                },
1565            ),
1566            (
1567                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1568                |vals: &[Option<Int96>]| {
1569                    Arc::new(TimestampMillisecondArray::from_iter(
1570                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1571                    )) as ArrayRef
1572                },
1573            ),
1574            (
1575                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1576                |vals: &[Option<Int96>]| {
1577                    Arc::new(TimestampMicrosecondArray::from_iter(
1578                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1579                    )) as ArrayRef
1580                },
1581            ),
1582            (
1583                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1584                |vals: &[Option<Int96>]| {
1585                    Arc::new(TimestampNanosecondArray::from_iter(
1586                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1587                    )) as ArrayRef
1588                },
1589            ),
1590            // Test another timezone with TimeUnit as ArrowType hints.
1591            (
1592                Some(ArrowDataType::Timestamp(
1593                    TimeUnit::Second,
1594                    Some(Arc::from("-05:00")),
1595                )),
1596                |vals: &[Option<Int96>]| {
1597                    Arc::new(
1598                        TimestampSecondArray::from_iter(
1599                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1600                        )
1601                        .with_timezone("-05:00"),
1602                    ) as ArrayRef
1603                },
1604            ),
1605        ];
1606
1607        resolutions.iter().for_each(|(arrow_type, converter)| {
1608            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1609                2,
1610                ConvertedType::NONE,
1611                arrow_type.clone(),
1612                converter,
1613                encodings,
1614            );
1615        })
1616    }
1617
1618    #[test]
1619    fn test_int96_from_spark_file_with_provided_schema() {
1620        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1621        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1622        // microsecond resolution for the Parquet reader to interpret these values correctly.
1623        use arrow_schema::DataType::Timestamp;
1624        let test_data = arrow::util::test_util::parquet_test_data();
1625        let path = format!("{test_data}/int96_from_spark.parquet");
1626        let file = File::open(path).unwrap();
1627
1628        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1629            "a",
1630            Timestamp(TimeUnit::Microsecond, None),
1631            true,
1632        )]));
1633        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1634
1635        let mut record_reader =
1636            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1637                .unwrap()
1638                .build()
1639                .unwrap();
1640
1641        let batch = record_reader.next().unwrap().unwrap();
1642        assert_eq!(batch.num_columns(), 1);
1643        let column = batch.column(0);
1644        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1645
1646        let expected = Arc::new(Int64Array::from(vec![
1647            Some(1704141296123456),
1648            Some(1704070800000000),
1649            Some(253402225200000000),
1650            Some(1735599600000000),
1651            None,
1652            Some(9089380393200000000),
1653        ]));
1654
1655        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1656        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1657        // anyway, so this should be a zero-copy non-modifying cast.
1658
1659        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1660        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1661
1662        assert_eq!(casted_timestamps.len(), expected.len());
1663
1664        casted_timestamps
1665            .iter()
1666            .zip(expected.iter())
1667            .for_each(|(lhs, rhs)| {
1668                assert_eq!(lhs, rhs);
1669            });
1670    }
1671
1672    #[test]
1673    fn test_int96_from_spark_file_without_provided_schema() {
1674        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1675        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1676        // values when read as nanosecond resolution overflow and result in garbage values.
1677        use arrow_schema::DataType::Timestamp;
1678        let test_data = arrow::util::test_util::parquet_test_data();
1679        let path = format!("{test_data}/int96_from_spark.parquet");
1680        let file = File::open(path).unwrap();
1681
1682        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1683            .unwrap()
1684            .build()
1685            .unwrap();
1686
1687        let batch = record_reader.next().unwrap().unwrap();
1688        assert_eq!(batch.num_columns(), 1);
1689        let column = batch.column(0);
1690        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1691
1692        let expected = Arc::new(Int64Array::from(vec![
1693            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1694            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1695            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1696            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1697            None,
1698            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1699        ]));
1700
1701        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1702        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1703        // anyway, so this should be a zero-copy non-modifying cast.
1704
1705        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1706        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1707
1708        assert_eq!(casted_timestamps.len(), expected.len());
1709
1710        casted_timestamps
1711            .iter()
1712            .zip(expected.iter())
1713            .for_each(|(lhs, rhs)| {
1714                assert_eq!(lhs, rhs);
1715            });
1716    }
1717
1718    struct RandUtf8Gen {}
1719
1720    impl RandGen<ByteArrayType> for RandUtf8Gen {
1721        fn gen(len: i32) -> ByteArray {
1722            Int32Type::gen(len).to_string().as_str().into()
1723        }
1724    }
1725
1726    #[test]
1727    fn test_utf8_single_column_reader_test() {
1728        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1729            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1730                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1731            })))
1732        }
1733
1734        let encodings = &[
1735            Encoding::PLAIN,
1736            Encoding::RLE_DICTIONARY,
1737            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1738            Encoding::DELTA_BYTE_ARRAY,
1739        ];
1740
1741        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1742            2,
1743            ConvertedType::NONE,
1744            None,
1745            |vals| {
1746                Arc::new(BinaryArray::from_iter(
1747                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1748                ))
1749            },
1750            encodings,
1751        );
1752
1753        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1754            2,
1755            ConvertedType::UTF8,
1756            None,
1757            string_converter::<i32>,
1758            encodings,
1759        );
1760
1761        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1762            2,
1763            ConvertedType::UTF8,
1764            Some(ArrowDataType::Utf8),
1765            string_converter::<i32>,
1766            encodings,
1767        );
1768
1769        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1770            2,
1771            ConvertedType::UTF8,
1772            Some(ArrowDataType::LargeUtf8),
1773            string_converter::<i64>,
1774            encodings,
1775        );
1776
1777        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1778        for key in &small_key_types {
1779            for encoding in encodings {
1780                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1781                opts.encoding = *encoding;
1782
1783                let data_type =
1784                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1785
1786                // Cannot run full test suite as keys overflow, run small test instead
1787                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1788                    opts,
1789                    2,
1790                    ConvertedType::UTF8,
1791                    Some(data_type.clone()),
1792                    move |vals| {
1793                        let vals = string_converter::<i32>(vals);
1794                        arrow::compute::cast(&vals, &data_type).unwrap()
1795                    },
1796                );
1797            }
1798        }
1799
1800        let key_types = [
1801            ArrowDataType::Int16,
1802            ArrowDataType::UInt16,
1803            ArrowDataType::Int32,
1804            ArrowDataType::UInt32,
1805            ArrowDataType::Int64,
1806            ArrowDataType::UInt64,
1807        ];
1808
1809        for key in &key_types {
1810            let data_type =
1811                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1812
1813            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1814                2,
1815                ConvertedType::UTF8,
1816                Some(data_type.clone()),
1817                move |vals| {
1818                    let vals = string_converter::<i32>(vals);
1819                    arrow::compute::cast(&vals, &data_type).unwrap()
1820                },
1821                encodings,
1822            );
1823
1824            // https://github.com/apache/arrow-rs/issues/1179
1825            // let data_type = ArrowDataType::Dictionary(
1826            //     Box::new(key.clone()),
1827            //     Box::new(ArrowDataType::LargeUtf8),
1828            // );
1829            //
1830            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1831            //     2,
1832            //     ConvertedType::UTF8,
1833            //     Some(data_type.clone()),
1834            //     move |vals| {
1835            //         let vals = string_converter::<i64>(vals);
1836            //         arrow::compute::cast(&vals, &data_type).unwrap()
1837            //     },
1838            //     encodings,
1839            // );
1840        }
1841    }
1842
1843    #[test]
1844    fn test_decimal_nullable_struct() {
1845        let decimals = Decimal256Array::from_iter_values(
1846            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1847        );
1848
1849        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1850            "decimals",
1851            decimals.data_type().clone(),
1852            false,
1853        )])))
1854        .len(8)
1855        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1856        .child_data(vec![decimals.into_data()])
1857        .build()
1858        .unwrap();
1859
1860        let written =
1861            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1862                .unwrap();
1863
1864        let mut buffer = Vec::with_capacity(1024);
1865        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1866        writer.write(&written).unwrap();
1867        writer.close().unwrap();
1868
1869        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1870            .unwrap()
1871            .collect::<Result<Vec<_>, _>>()
1872            .unwrap();
1873
1874        assert_eq!(&written.slice(0, 3), &read[0]);
1875        assert_eq!(&written.slice(3, 3), &read[1]);
1876        assert_eq!(&written.slice(6, 2), &read[2]);
1877    }
1878
1879    #[test]
1880    fn test_int32_nullable_struct() {
1881        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1882        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1883            "int32",
1884            int32.data_type().clone(),
1885            false,
1886        )])))
1887        .len(8)
1888        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1889        .child_data(vec![int32.into_data()])
1890        .build()
1891        .unwrap();
1892
1893        let written =
1894            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1895                .unwrap();
1896
1897        let mut buffer = Vec::with_capacity(1024);
1898        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1899        writer.write(&written).unwrap();
1900        writer.close().unwrap();
1901
1902        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1903            .unwrap()
1904            .collect::<Result<Vec<_>, _>>()
1905            .unwrap();
1906
1907        assert_eq!(&written.slice(0, 3), &read[0]);
1908        assert_eq!(&written.slice(3, 3), &read[1]);
1909        assert_eq!(&written.slice(6, 2), &read[2]);
1910    }
1911
1912    #[test]
1913    fn test_decimal_list() {
1914        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1915
1916        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1917        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1918            decimals.data_type().clone(),
1919            false,
1920        ))))
1921        .len(7)
1922        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1923        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1924        .child_data(vec![decimals.into_data()])
1925        .build()
1926        .unwrap();
1927
1928        let written =
1929            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1930                .unwrap();
1931
1932        let mut buffer = Vec::with_capacity(1024);
1933        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1934        writer.write(&written).unwrap();
1935        writer.close().unwrap();
1936
1937        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1938            .unwrap()
1939            .collect::<Result<Vec<_>, _>>()
1940            .unwrap();
1941
1942        assert_eq!(&written.slice(0, 3), &read[0]);
1943        assert_eq!(&written.slice(3, 3), &read[1]);
1944        assert_eq!(&written.slice(6, 1), &read[2]);
1945    }
1946
1947    #[test]
1948    fn test_read_decimal_file() {
1949        use arrow_array::Decimal128Array;
1950        let testdata = arrow::util::test_util::parquet_test_data();
1951        let file_variants = vec![
1952            ("byte_array", 4),
1953            ("fixed_length", 25),
1954            ("int32", 4),
1955            ("int64", 10),
1956        ];
1957        for (prefix, target_precision) in file_variants {
1958            let path = format!("{testdata}/{prefix}_decimal.parquet");
1959            let file = File::open(path).unwrap();
1960            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1961
1962            let batch = record_reader.next().unwrap().unwrap();
1963            assert_eq!(batch.num_rows(), 24);
1964            let col = batch
1965                .column(0)
1966                .as_any()
1967                .downcast_ref::<Decimal128Array>()
1968                .unwrap();
1969
1970            let expected = 1..25;
1971
1972            assert_eq!(col.precision(), target_precision);
1973            assert_eq!(col.scale(), 2);
1974
1975            for (i, v) in expected.enumerate() {
1976                assert_eq!(col.value(i), v * 100_i128);
1977            }
1978        }
1979    }
1980
1981    #[test]
1982    fn test_read_float16_nonzeros_file() {
1983        use arrow_array::Float16Array;
1984        let testdata = arrow::util::test_util::parquet_test_data();
1985        // see https://github.com/apache/parquet-testing/pull/40
1986        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1987        let file = File::open(path).unwrap();
1988        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1989
1990        let batch = record_reader.next().unwrap().unwrap();
1991        assert_eq!(batch.num_rows(), 8);
1992        let col = batch
1993            .column(0)
1994            .as_any()
1995            .downcast_ref::<Float16Array>()
1996            .unwrap();
1997
1998        let f16_two = f16::ONE + f16::ONE;
1999
2000        assert_eq!(col.null_count(), 1);
2001        assert!(col.is_null(0));
2002        assert_eq!(col.value(1), f16::ONE);
2003        assert_eq!(col.value(2), -f16_two);
2004        assert!(col.value(3).is_nan());
2005        assert_eq!(col.value(4), f16::ZERO);
2006        assert!(col.value(4).is_sign_positive());
2007        assert_eq!(col.value(5), f16::NEG_ONE);
2008        assert_eq!(col.value(6), f16::NEG_ZERO);
2009        assert!(col.value(6).is_sign_negative());
2010        assert_eq!(col.value(7), f16_two);
2011    }
2012
2013    #[test]
2014    fn test_read_float16_zeros_file() {
2015        use arrow_array::Float16Array;
2016        let testdata = arrow::util::test_util::parquet_test_data();
2017        // see https://github.com/apache/parquet-testing/pull/40
2018        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2019        let file = File::open(path).unwrap();
2020        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2021
2022        let batch = record_reader.next().unwrap().unwrap();
2023        assert_eq!(batch.num_rows(), 3);
2024        let col = batch
2025            .column(0)
2026            .as_any()
2027            .downcast_ref::<Float16Array>()
2028            .unwrap();
2029
2030        assert_eq!(col.null_count(), 1);
2031        assert!(col.is_null(0));
2032        assert_eq!(col.value(1), f16::ZERO);
2033        assert!(col.value(1).is_sign_positive());
2034        assert!(col.value(2).is_nan());
2035    }
2036
2037    #[test]
2038    fn test_read_float32_float64_byte_stream_split() {
2039        let path = format!(
2040            "{}/byte_stream_split.zstd.parquet",
2041            arrow::util::test_util::parquet_test_data(),
2042        );
2043        let file = File::open(path).unwrap();
2044        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2045
2046        let mut row_count = 0;
2047        for batch in record_reader {
2048            let batch = batch.unwrap();
2049            row_count += batch.num_rows();
2050            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2051            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2052
2053            // This file contains floats from a standard normal distribution
2054            for &x in f32_col.values() {
2055                assert!(x > -10.0);
2056                assert!(x < 10.0);
2057            }
2058            for &x in f64_col.values() {
2059                assert!(x > -10.0);
2060                assert!(x < 10.0);
2061            }
2062        }
2063        assert_eq!(row_count, 300);
2064    }
2065
2066    #[test]
2067    fn test_read_extended_byte_stream_split() {
2068        let path = format!(
2069            "{}/byte_stream_split_extended.gzip.parquet",
2070            arrow::util::test_util::parquet_test_data(),
2071        );
2072        let file = File::open(path).unwrap();
2073        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2074
2075        let mut row_count = 0;
2076        for batch in record_reader {
2077            let batch = batch.unwrap();
2078            row_count += batch.num_rows();
2079
2080            // 0,1 are f16
2081            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2082            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2083            assert_eq!(f16_col.len(), f16_bss.len());
2084            f16_col
2085                .iter()
2086                .zip(f16_bss.iter())
2087                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2088
2089            // 2,3 are f32
2090            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2091            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2092            assert_eq!(f32_col.len(), f32_bss.len());
2093            f32_col
2094                .iter()
2095                .zip(f32_bss.iter())
2096                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2097
2098            // 4,5 are f64
2099            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2100            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2101            assert_eq!(f64_col.len(), f64_bss.len());
2102            f64_col
2103                .iter()
2104                .zip(f64_bss.iter())
2105                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2106
2107            // 6,7 are i32
2108            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2109            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2110            assert_eq!(i32_col.len(), i32_bss.len());
2111            i32_col
2112                .iter()
2113                .zip(i32_bss.iter())
2114                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2115
2116            // 8,9 are i64
2117            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2118            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2119            assert_eq!(i64_col.len(), i64_bss.len());
2120            i64_col
2121                .iter()
2122                .zip(i64_bss.iter())
2123                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2124
2125            // 10,11 are FLBA(5)
2126            let flba_col = batch.column(10).as_fixed_size_binary();
2127            let flba_bss = batch.column(11).as_fixed_size_binary();
2128            assert_eq!(flba_col.len(), flba_bss.len());
2129            flba_col
2130                .iter()
2131                .zip(flba_bss.iter())
2132                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2133
2134            // 12,13 are FLBA(4) (decimal(7,3))
2135            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2136            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2137            assert_eq!(dec_col.len(), dec_bss.len());
2138            dec_col
2139                .iter()
2140                .zip(dec_bss.iter())
2141                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2142        }
2143        assert_eq!(row_count, 200);
2144    }
2145
2146    #[test]
2147    fn test_read_incorrect_map_schema_file() {
2148        let testdata = arrow::util::test_util::parquet_test_data();
2149        // see https://github.com/apache/parquet-testing/pull/47
2150        let path = format!("{testdata}/incorrect_map_schema.parquet");
2151        let file = File::open(path).unwrap();
2152        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2153
2154        let batch = record_reader.next().unwrap().unwrap();
2155        assert_eq!(batch.num_rows(), 1);
2156
2157        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2158            "my_map",
2159            ArrowDataType::Map(
2160                Arc::new(Field::new(
2161                    "key_value",
2162                    ArrowDataType::Struct(Fields::from(vec![
2163                        Field::new("key", ArrowDataType::Utf8, false),
2164                        Field::new("value", ArrowDataType::Utf8, true),
2165                    ])),
2166                    false,
2167                )),
2168                false,
2169            ),
2170            true,
2171        )]));
2172        assert_eq!(batch.schema().as_ref(), &expected_schema);
2173
2174        assert_eq!(batch.num_rows(), 1);
2175        assert_eq!(batch.column(0).null_count(), 0);
2176        assert_eq!(
2177            batch.column(0).as_map().keys().as_ref(),
2178            &StringArray::from(vec!["parent", "name"])
2179        );
2180        assert_eq!(
2181            batch.column(0).as_map().values().as_ref(),
2182            &StringArray::from(vec!["another", "report"])
2183        );
2184    }
2185
2186    /// Parameters for single_column_reader_test
2187    #[derive(Clone)]
2188    struct TestOptions {
2189        /// Number of row group to write to parquet (row group size =
2190        /// num_row_groups / num_rows)
2191        num_row_groups: usize,
2192        /// Total number of rows per row group
2193        num_rows: usize,
2194        /// Size of batches to read back
2195        record_batch_size: usize,
2196        /// Percentage of nulls in column or None if required
2197        null_percent: Option<usize>,
2198        /// Set write batch size
2199        ///
2200        /// This is the number of rows that are written at once to a page and
2201        /// therefore acts as a bound on the page granularity of a row group
2202        write_batch_size: usize,
2203        /// Maximum size of page in bytes
2204        max_data_page_size: usize,
2205        /// Maximum size of dictionary page in bytes
2206        max_dict_page_size: usize,
2207        /// Writer version
2208        writer_version: WriterVersion,
2209        /// Enabled statistics
2210        enabled_statistics: EnabledStatistics,
2211        /// Encoding
2212        encoding: Encoding,
2213        /// row selections and total selected row count
2214        row_selections: Option<(RowSelection, usize)>,
2215        /// row filter
2216        row_filter: Option<Vec<bool>>,
2217        /// limit
2218        limit: Option<usize>,
2219        /// offset
2220        offset: Option<usize>,
2221    }
2222
2223    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2224    impl std::fmt::Debug for TestOptions {
2225        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2226            f.debug_struct("TestOptions")
2227                .field("num_row_groups", &self.num_row_groups)
2228                .field("num_rows", &self.num_rows)
2229                .field("record_batch_size", &self.record_batch_size)
2230                .field("null_percent", &self.null_percent)
2231                .field("write_batch_size", &self.write_batch_size)
2232                .field("max_data_page_size", &self.max_data_page_size)
2233                .field("max_dict_page_size", &self.max_dict_page_size)
2234                .field("writer_version", &self.writer_version)
2235                .field("enabled_statistics", &self.enabled_statistics)
2236                .field("encoding", &self.encoding)
2237                .field("row_selections", &self.row_selections.is_some())
2238                .field("row_filter", &self.row_filter.is_some())
2239                .field("limit", &self.limit)
2240                .field("offset", &self.offset)
2241                .finish()
2242        }
2243    }
2244
2245    impl Default for TestOptions {
2246        fn default() -> Self {
2247            Self {
2248                num_row_groups: 2,
2249                num_rows: 100,
2250                record_batch_size: 15,
2251                null_percent: None,
2252                write_batch_size: 64,
2253                max_data_page_size: 1024 * 1024,
2254                max_dict_page_size: 1024 * 1024,
2255                writer_version: WriterVersion::PARQUET_1_0,
2256                enabled_statistics: EnabledStatistics::Page,
2257                encoding: Encoding::PLAIN,
2258                row_selections: None,
2259                row_filter: None,
2260                limit: None,
2261                offset: None,
2262            }
2263        }
2264    }
2265
2266    impl TestOptions {
2267        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2268            Self {
2269                num_row_groups,
2270                num_rows,
2271                record_batch_size,
2272                ..Default::default()
2273            }
2274        }
2275
2276        fn with_null_percent(self, null_percent: usize) -> Self {
2277            Self {
2278                null_percent: Some(null_percent),
2279                ..self
2280            }
2281        }
2282
2283        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2284            Self {
2285                max_data_page_size,
2286                ..self
2287            }
2288        }
2289
2290        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2291            Self {
2292                max_dict_page_size,
2293                ..self
2294            }
2295        }
2296
2297        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2298            Self {
2299                enabled_statistics,
2300                ..self
2301            }
2302        }
2303
2304        fn with_row_selections(self) -> Self {
2305            assert!(self.row_filter.is_none(), "Must set row selection first");
2306
2307            let mut rng = rng();
2308            let step = rng.random_range(self.record_batch_size..self.num_rows);
2309            let row_selections = create_test_selection(
2310                step,
2311                self.num_row_groups * self.num_rows,
2312                rng.random::<bool>(),
2313            );
2314            Self {
2315                row_selections: Some(row_selections),
2316                ..self
2317            }
2318        }
2319
2320        fn with_row_filter(self) -> Self {
2321            let row_count = match &self.row_selections {
2322                Some((_, count)) => *count,
2323                None => self.num_row_groups * self.num_rows,
2324            };
2325
2326            let mut rng = rng();
2327            Self {
2328                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2329                ..self
2330            }
2331        }
2332
2333        fn with_limit(self, limit: usize) -> Self {
2334            Self {
2335                limit: Some(limit),
2336                ..self
2337            }
2338        }
2339
2340        fn with_offset(self, offset: usize) -> Self {
2341            Self {
2342                offset: Some(offset),
2343                ..self
2344            }
2345        }
2346
2347        fn writer_props(&self) -> WriterProperties {
2348            let builder = WriterProperties::builder()
2349                .set_data_page_size_limit(self.max_data_page_size)
2350                .set_write_batch_size(self.write_batch_size)
2351                .set_writer_version(self.writer_version)
2352                .set_statistics_enabled(self.enabled_statistics);
2353
2354            let builder = match self.encoding {
2355                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2356                    .set_dictionary_enabled(true)
2357                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2358                _ => builder
2359                    .set_dictionary_enabled(false)
2360                    .set_encoding(self.encoding),
2361            };
2362
2363            builder.build()
2364        }
2365    }
2366
2367    /// Create a parquet file and then read it using
2368    /// `ParquetFileArrowReader` using a standard set of parameters
2369    /// `opts`.
2370    ///
2371    /// `rand_max` represents the maximum size of value to pass to to
2372    /// value generator
2373    fn run_single_column_reader_tests<T, F, G>(
2374        rand_max: i32,
2375        converted_type: ConvertedType,
2376        arrow_type: Option<ArrowDataType>,
2377        converter: F,
2378        encodings: &[Encoding],
2379    ) where
2380        T: DataType,
2381        G: RandGen<T>,
2382        F: Fn(&[Option<T::T>]) -> ArrayRef,
2383    {
2384        let all_options = vec![
2385            // choose record_batch_batch (15) so batches cross row
2386            // group boundaries (50 rows in 2 row groups) cases.
2387            TestOptions::new(2, 100, 15),
2388            // choose record_batch_batch (5) so batches sometime fall
2389            // on row group boundaries and (25 rows in 3 row groups
2390            // --> row groups of 10, 10, and 5). Tests buffer
2391            // refilling edge cases.
2392            TestOptions::new(3, 25, 5),
2393            // Choose record_batch_size (25) so all batches fall
2394            // exactly on row group boundary (25). Tests buffer
2395            // refilling edge cases.
2396            TestOptions::new(4, 100, 25),
2397            // Set maximum page size so row groups have multiple pages
2398            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2399            // Set small dictionary page size to test dictionary fallback
2400            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2401            // Test optional but with no nulls
2402            TestOptions::new(2, 256, 127).with_null_percent(0),
2403            // Test optional with nulls
2404            TestOptions::new(2, 256, 93).with_null_percent(25),
2405            // Test with limit of 0
2406            TestOptions::new(4, 100, 25).with_limit(0),
2407            // Test with limit of 50
2408            TestOptions::new(4, 100, 25).with_limit(50),
2409            // Test with limit equal to number of rows
2410            TestOptions::new(4, 100, 25).with_limit(10),
2411            // Test with limit larger than number of rows
2412            TestOptions::new(4, 100, 25).with_limit(101),
2413            // Test with limit + offset equal to number of rows
2414            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2415            // Test with limit + offset equal to number of rows
2416            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2417            // Test with limit + offset larger than number of rows
2418            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2419            // Test with no page-level statistics
2420            TestOptions::new(2, 256, 91)
2421                .with_null_percent(25)
2422                .with_enabled_statistics(EnabledStatistics::Chunk),
2423            // Test with no statistics
2424            TestOptions::new(2, 256, 91)
2425                .with_null_percent(25)
2426                .with_enabled_statistics(EnabledStatistics::None),
2427            // Test with all null
2428            TestOptions::new(2, 128, 91)
2429                .with_null_percent(100)
2430                .with_enabled_statistics(EnabledStatistics::None),
2431            // Test skip
2432
2433            // choose record_batch_batch (15) so batches cross row
2434            // group boundaries (50 rows in 2 row groups) cases.
2435            TestOptions::new(2, 100, 15).with_row_selections(),
2436            // choose record_batch_batch (5) so batches sometime fall
2437            // on row group boundaries and (25 rows in 3 row groups
2438            // --> row groups of 10, 10, and 5). Tests buffer
2439            // refilling edge cases.
2440            TestOptions::new(3, 25, 5).with_row_selections(),
2441            // Choose record_batch_size (25) so all batches fall
2442            // exactly on row group boundary (25). Tests buffer
2443            // refilling edge cases.
2444            TestOptions::new(4, 100, 25).with_row_selections(),
2445            // Set maximum page size so row groups have multiple pages
2446            TestOptions::new(3, 256, 73)
2447                .with_max_data_page_size(128)
2448                .with_row_selections(),
2449            // Set small dictionary page size to test dictionary fallback
2450            TestOptions::new(3, 256, 57)
2451                .with_max_dict_page_size(128)
2452                .with_row_selections(),
2453            // Test optional but with no nulls
2454            TestOptions::new(2, 256, 127)
2455                .with_null_percent(0)
2456                .with_row_selections(),
2457            // Test optional with nulls
2458            TestOptions::new(2, 256, 93)
2459                .with_null_percent(25)
2460                .with_row_selections(),
2461            // Test optional with nulls
2462            TestOptions::new(2, 256, 93)
2463                .with_null_percent(25)
2464                .with_row_selections()
2465                .with_limit(10),
2466            // Test optional with nulls
2467            TestOptions::new(2, 256, 93)
2468                .with_null_percent(25)
2469                .with_row_selections()
2470                .with_offset(20)
2471                .with_limit(10),
2472            // Test filter
2473
2474            // Test with row filter
2475            TestOptions::new(4, 100, 25).with_row_filter(),
2476            // Test with row selection and row filter
2477            TestOptions::new(4, 100, 25)
2478                .with_row_selections()
2479                .with_row_filter(),
2480            // Test with nulls and row filter
2481            TestOptions::new(2, 256, 93)
2482                .with_null_percent(25)
2483                .with_max_data_page_size(10)
2484                .with_row_filter(),
2485            // Test with nulls and row filter and small pages
2486            TestOptions::new(2, 256, 93)
2487                .with_null_percent(25)
2488                .with_max_data_page_size(10)
2489                .with_row_selections()
2490                .with_row_filter(),
2491            // Test with row selection and no offset index and small pages
2492            TestOptions::new(2, 256, 93)
2493                .with_enabled_statistics(EnabledStatistics::None)
2494                .with_max_data_page_size(10)
2495                .with_row_selections(),
2496        ];
2497
2498        all_options.into_iter().for_each(|opts| {
2499            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2500                for encoding in encodings {
2501                    let opts = TestOptions {
2502                        writer_version,
2503                        encoding: *encoding,
2504                        ..opts.clone()
2505                    };
2506
2507                    single_column_reader_test::<T, _, G>(
2508                        opts,
2509                        rand_max,
2510                        converted_type,
2511                        arrow_type.clone(),
2512                        &converter,
2513                    )
2514                }
2515            }
2516        });
2517    }
2518
2519    /// Create a parquet file and then read it using
2520    /// `ParquetFileArrowReader` using the parameters described in
2521    /// `opts`.
2522    fn single_column_reader_test<T, F, G>(
2523        opts: TestOptions,
2524        rand_max: i32,
2525        converted_type: ConvertedType,
2526        arrow_type: Option<ArrowDataType>,
2527        converter: F,
2528    ) where
2529        T: DataType,
2530        G: RandGen<T>,
2531        F: Fn(&[Option<T::T>]) -> ArrayRef,
2532    {
2533        // Print out options to facilitate debugging failures on CI
2534        println!(
2535            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2536            T::get_physical_type(), converted_type, arrow_type, opts
2537        );
2538
2539        //according to null_percent generate def_levels
2540        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2541            Some(null_percent) => {
2542                let mut rng = rng();
2543
2544                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2545                    .map(|_| {
2546                        std::iter::from_fn(|| {
2547                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2548                        })
2549                        .take(opts.num_rows)
2550                        .collect()
2551                    })
2552                    .collect();
2553                (Repetition::OPTIONAL, Some(def_levels))
2554            }
2555            None => (Repetition::REQUIRED, None),
2556        };
2557
2558        //generate random table data
2559        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2560            .map(|idx| {
2561                let null_count = match def_levels.as_ref() {
2562                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2563                    None => 0,
2564                };
2565                G::gen_vec(rand_max, opts.num_rows - null_count)
2566            })
2567            .collect();
2568
2569        let len = match T::get_physical_type() {
2570            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2571            crate::basic::Type::INT96 => 12,
2572            _ => -1,
2573        };
2574
2575        let fields = vec![Arc::new(
2576            Type::primitive_type_builder("leaf", T::get_physical_type())
2577                .with_repetition(repetition)
2578                .with_converted_type(converted_type)
2579                .with_length(len)
2580                .build()
2581                .unwrap(),
2582        )];
2583
2584        let schema = Arc::new(
2585            Type::group_type_builder("test_schema")
2586                .with_fields(fields)
2587                .build()
2588                .unwrap(),
2589        );
2590
2591        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2592
2593        let mut file = tempfile::tempfile().unwrap();
2594
2595        generate_single_column_file_with_data::<T>(
2596            &values,
2597            def_levels.as_ref(),
2598            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2599            schema,
2600            arrow_field,
2601            &opts,
2602        )
2603        .unwrap();
2604
2605        file.rewind().unwrap();
2606
2607        let options = ArrowReaderOptions::new()
2608            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2609
2610        let mut builder =
2611            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2612
2613        let expected_data = match opts.row_selections {
2614            Some((selections, row_count)) => {
2615                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2616
2617                let mut skip_data: Vec<Option<T::T>> = vec![];
2618                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2619                for select in dequeue {
2620                    if select.skip {
2621                        without_skip_data.drain(0..select.row_count);
2622                    } else {
2623                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2624                    }
2625                }
2626                builder = builder.with_row_selection(selections);
2627
2628                assert_eq!(skip_data.len(), row_count);
2629                skip_data
2630            }
2631            None => {
2632                //get flatten table data
2633                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2634                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2635                expected_data
2636            }
2637        };
2638
2639        let mut expected_data = match opts.row_filter {
2640            Some(filter) => {
2641                let expected_data = expected_data
2642                    .into_iter()
2643                    .zip(filter.iter())
2644                    .filter_map(|(d, f)| f.then(|| d))
2645                    .collect();
2646
2647                let mut filter_offset = 0;
2648                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2649                    ProjectionMask::all(),
2650                    move |b| {
2651                        let array = BooleanArray::from_iter(
2652                            filter
2653                                .iter()
2654                                .skip(filter_offset)
2655                                .take(b.num_rows())
2656                                .map(|x| Some(*x)),
2657                        );
2658                        filter_offset += b.num_rows();
2659                        Ok(array)
2660                    },
2661                ))]);
2662
2663                builder = builder.with_row_filter(filter);
2664                expected_data
2665            }
2666            None => expected_data,
2667        };
2668
2669        if let Some(offset) = opts.offset {
2670            builder = builder.with_offset(offset);
2671            expected_data = expected_data.into_iter().skip(offset).collect();
2672        }
2673
2674        if let Some(limit) = opts.limit {
2675            builder = builder.with_limit(limit);
2676            expected_data = expected_data.into_iter().take(limit).collect();
2677        }
2678
2679        let mut record_reader = builder
2680            .with_batch_size(opts.record_batch_size)
2681            .build()
2682            .unwrap();
2683
2684        let mut total_read = 0;
2685        loop {
2686            let maybe_batch = record_reader.next();
2687            if total_read < expected_data.len() {
2688                let end = min(total_read + opts.record_batch_size, expected_data.len());
2689                let batch = maybe_batch.unwrap().unwrap();
2690                assert_eq!(end - total_read, batch.num_rows());
2691
2692                let a = converter(&expected_data[total_read..end]);
2693                let b = Arc::clone(batch.column(0));
2694
2695                assert_eq!(a.data_type(), b.data_type());
2696                assert_eq!(a.to_data(), b.to_data());
2697                assert_eq!(
2698                    a.as_any().type_id(),
2699                    b.as_any().type_id(),
2700                    "incorrect type ids"
2701                );
2702
2703                total_read = end;
2704            } else {
2705                assert!(maybe_batch.is_none());
2706                break;
2707            }
2708        }
2709    }
2710
2711    fn gen_expected_data<T: DataType>(
2712        def_levels: Option<&Vec<Vec<i16>>>,
2713        values: &[Vec<T::T>],
2714    ) -> Vec<Option<T::T>> {
2715        let data: Vec<Option<T::T>> = match def_levels {
2716            Some(levels) => {
2717                let mut values_iter = values.iter().flatten();
2718                levels
2719                    .iter()
2720                    .flatten()
2721                    .map(|d| match d {
2722                        1 => Some(values_iter.next().cloned().unwrap()),
2723                        0 => None,
2724                        _ => unreachable!(),
2725                    })
2726                    .collect()
2727            }
2728            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2729        };
2730        data
2731    }
2732
2733    fn generate_single_column_file_with_data<T: DataType>(
2734        values: &[Vec<T::T>],
2735        def_levels: Option<&Vec<Vec<i16>>>,
2736        file: File,
2737        schema: TypePtr,
2738        field: Option<Field>,
2739        opts: &TestOptions,
2740    ) -> Result<crate::format::FileMetaData> {
2741        let mut writer_props = opts.writer_props();
2742        if let Some(field) = field {
2743            let arrow_schema = Schema::new(vec![field]);
2744            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2745        }
2746
2747        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2748
2749        for (idx, v) in values.iter().enumerate() {
2750            let def_levels = def_levels.map(|d| d[idx].as_slice());
2751            let mut row_group_writer = writer.next_row_group()?;
2752            {
2753                let mut column_writer = row_group_writer
2754                    .next_column()?
2755                    .expect("Column writer is none!");
2756
2757                column_writer
2758                    .typed::<T>()
2759                    .write_batch(v, def_levels, None)?;
2760
2761                column_writer.close()?;
2762            }
2763            row_group_writer.close()?;
2764        }
2765
2766        writer.close()
2767    }
2768
2769    fn get_test_file(file_name: &str) -> File {
2770        let mut path = PathBuf::new();
2771        path.push(arrow::util::test_util::arrow_test_data());
2772        path.push(file_name);
2773
2774        File::open(path.as_path()).expect("File not found!")
2775    }
2776
2777    #[test]
2778    fn test_read_structs() {
2779        // This particular test file has columns of struct types where there is
2780        // a column that has the same name as one of the struct fields
2781        // (see: ARROW-11452)
2782        let testdata = arrow::util::test_util::parquet_test_data();
2783        let path = format!("{testdata}/nested_structs.rust.parquet");
2784        let file = File::open(&path).unwrap();
2785        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2786
2787        for batch in record_batch_reader {
2788            batch.unwrap();
2789        }
2790
2791        let file = File::open(&path).unwrap();
2792        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2793
2794        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2795        let projected_reader = builder
2796            .with_projection(mask)
2797            .with_batch_size(60)
2798            .build()
2799            .unwrap();
2800
2801        let expected_schema = Schema::new(vec![
2802            Field::new(
2803                "roll_num",
2804                ArrowDataType::Struct(Fields::from(vec![Field::new(
2805                    "count",
2806                    ArrowDataType::UInt64,
2807                    false,
2808                )])),
2809                false,
2810            ),
2811            Field::new(
2812                "PC_CUR",
2813                ArrowDataType::Struct(Fields::from(vec![
2814                    Field::new("mean", ArrowDataType::Int64, false),
2815                    Field::new("sum", ArrowDataType::Int64, false),
2816                ])),
2817                false,
2818            ),
2819        ]);
2820
2821        // Tests for #1652 and #1654
2822        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2823
2824        for batch in projected_reader {
2825            let batch = batch.unwrap();
2826            assert_eq!(batch.schema().as_ref(), &expected_schema);
2827        }
2828    }
2829
2830    #[test]
2831    // same as test_read_structs but constructs projection mask via column names
2832    fn test_read_structs_by_name() {
2833        let testdata = arrow::util::test_util::parquet_test_data();
2834        let path = format!("{testdata}/nested_structs.rust.parquet");
2835        let file = File::open(&path).unwrap();
2836        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2837
2838        for batch in record_batch_reader {
2839            batch.unwrap();
2840        }
2841
2842        let file = File::open(&path).unwrap();
2843        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2844
2845        let mask = ProjectionMask::columns(
2846            builder.parquet_schema(),
2847            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2848        );
2849        let projected_reader = builder
2850            .with_projection(mask)
2851            .with_batch_size(60)
2852            .build()
2853            .unwrap();
2854
2855        let expected_schema = Schema::new(vec![
2856            Field::new(
2857                "roll_num",
2858                ArrowDataType::Struct(Fields::from(vec![Field::new(
2859                    "count",
2860                    ArrowDataType::UInt64,
2861                    false,
2862                )])),
2863                false,
2864            ),
2865            Field::new(
2866                "PC_CUR",
2867                ArrowDataType::Struct(Fields::from(vec![
2868                    Field::new("mean", ArrowDataType::Int64, false),
2869                    Field::new("sum", ArrowDataType::Int64, false),
2870                ])),
2871                false,
2872            ),
2873        ]);
2874
2875        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2876
2877        for batch in projected_reader {
2878            let batch = batch.unwrap();
2879            assert_eq!(batch.schema().as_ref(), &expected_schema);
2880        }
2881    }
2882
2883    #[test]
2884    fn test_read_maps() {
2885        let testdata = arrow::util::test_util::parquet_test_data();
2886        let path = format!("{testdata}/nested_maps.snappy.parquet");
2887        let file = File::open(path).unwrap();
2888        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2889
2890        for batch in record_batch_reader {
2891            batch.unwrap();
2892        }
2893    }
2894
2895    #[test]
2896    fn test_nested_nullability() {
2897        let message_type = "message nested {
2898          OPTIONAL Group group {
2899            REQUIRED INT32 leaf;
2900          }
2901        }";
2902
2903        let file = tempfile::tempfile().unwrap();
2904        let schema = Arc::new(parse_message_type(message_type).unwrap());
2905
2906        {
2907            // Write using low-level parquet API (#1167)
2908            let mut writer =
2909                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2910                    .unwrap();
2911
2912            {
2913                let mut row_group_writer = writer.next_row_group().unwrap();
2914                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2915
2916                column_writer
2917                    .typed::<Int32Type>()
2918                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2919                    .unwrap();
2920
2921                column_writer.close().unwrap();
2922                row_group_writer.close().unwrap();
2923            }
2924
2925            writer.close().unwrap();
2926        }
2927
2928        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2929        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2930
2931        let reader = builder.with_projection(mask).build().unwrap();
2932
2933        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2934            "group",
2935            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2936            true,
2937        )]));
2938
2939        let batch = reader.into_iter().next().unwrap().unwrap();
2940        assert_eq!(batch.schema().as_ref(), &expected_schema);
2941        assert_eq!(batch.num_rows(), 4);
2942        assert_eq!(batch.column(0).null_count(), 2);
2943    }
2944
2945    #[test]
2946    fn test_invalid_utf8() {
2947        // a parquet file with 1 column with invalid utf8
2948        let data = vec![
2949            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2950            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2951            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2952            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2953            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2954            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2955            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2956            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2957            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2958            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2959            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2960            82, 49,
2961        ];
2962
2963        let file = Bytes::from(data);
2964        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2965
2966        let error = record_batch_reader.next().unwrap().unwrap_err();
2967
2968        assert!(
2969            error.to_string().contains("invalid utf-8 sequence"),
2970            "{}",
2971            error
2972        );
2973    }
2974
2975    #[test]
2976    fn test_invalid_utf8_string_array() {
2977        test_invalid_utf8_string_array_inner::<i32>();
2978    }
2979
2980    #[test]
2981    fn test_invalid_utf8_large_string_array() {
2982        test_invalid_utf8_string_array_inner::<i64>();
2983    }
2984
2985    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2986        let cases = [
2987            invalid_utf8_first_char::<O>(),
2988            invalid_utf8_first_char_long_strings::<O>(),
2989            invalid_utf8_later_char::<O>(),
2990            invalid_utf8_later_char_long_strings::<O>(),
2991            invalid_utf8_later_char_really_long_strings::<O>(),
2992            invalid_utf8_later_char_really_long_strings2::<O>(),
2993        ];
2994        for array in &cases {
2995            for encoding in STRING_ENCODINGS {
2996                // data is not valid utf8 we can not construct a correct StringArray
2997                // safely, so purposely create an invalid StringArray
2998                let array = unsafe {
2999                    GenericStringArray::<O>::new_unchecked(
3000                        array.offsets().clone(),
3001                        array.values().clone(),
3002                        array.nulls().cloned(),
3003                    )
3004                };
3005                let data_type = array.data_type().clone();
3006                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3007                let err = read_from_parquet(data).unwrap_err();
3008                let expected_err =
3009                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3010                assert!(
3011                    err.to_string().contains(expected_err),
3012                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3013                );
3014            }
3015        }
3016    }
3017
3018    #[test]
3019    fn test_invalid_utf8_string_view_array() {
3020        let cases = [
3021            invalid_utf8_first_char::<i32>(),
3022            invalid_utf8_first_char_long_strings::<i32>(),
3023            invalid_utf8_later_char::<i32>(),
3024            invalid_utf8_later_char_long_strings::<i32>(),
3025            invalid_utf8_later_char_really_long_strings::<i32>(),
3026            invalid_utf8_later_char_really_long_strings2::<i32>(),
3027        ];
3028
3029        for encoding in STRING_ENCODINGS {
3030            for array in &cases {
3031                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3032                let array = array.as_binary_view();
3033
3034                // data is not valid utf8 we can not construct a correct StringArray
3035                // safely, so purposely create an invalid StringViewArray
3036                let array = unsafe {
3037                    StringViewArray::new_unchecked(
3038                        array.views().clone(),
3039                        array.data_buffers().to_vec(),
3040                        array.nulls().cloned(),
3041                    )
3042                };
3043
3044                let data_type = array.data_type().clone();
3045                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3046                let err = read_from_parquet(data).unwrap_err();
3047                let expected_err =
3048                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3049                assert!(
3050                    err.to_string().contains(expected_err),
3051                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3052                );
3053            }
3054        }
3055    }
3056
3057    /// Encodings suitable for string data
3058    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3059        None,
3060        Some(Encoding::PLAIN),
3061        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3062        Some(Encoding::DELTA_BYTE_ARRAY),
3063    ];
3064
3065    /// Invalid Utf-8 sequence in the first character
3066    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3067    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3068
3069    /// Invalid Utf=8 sequence in NOT the first character
3070    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3071    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3072
3073    /// returns a BinaryArray with invalid UTF8 data in the first character
3074    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3075        let valid: &[u8] = b"   ";
3076        let invalid = INVALID_UTF8_FIRST_CHAR;
3077        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3078    }
3079
3080    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3081    /// string larger than 12 bytes which is handled specially when reading
3082    /// `ByteViewArray`s
3083    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3084        let valid: &[u8] = b"   ";
3085        let mut invalid = vec![];
3086        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3087        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3088        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3089    }
3090
3091    /// returns a BinaryArray with invalid UTF8 data in a character other than
3092    /// the first (this is checked in a special codepath)
3093    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3094        let valid: &[u8] = b"   ";
3095        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3096        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3097    }
3098
3099    /// returns a BinaryArray with invalid UTF8 data in a character other than
3100    /// the first in a string larger than 12 bytes which is handled specially
3101    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3102    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3103        let valid: &[u8] = b"   ";
3104        let mut invalid = vec![];
3105        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3106        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3107        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3108    }
3109
3110    /// returns a BinaryArray with invalid UTF8 data in a character other than
3111    /// the first in a string larger than 128 bytes which is handled specially
3112    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3113    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3114        let valid: &[u8] = b"   ";
3115        let mut invalid = vec![];
3116        for _ in 0..10 {
3117            // each instance is 38 bytes
3118            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3119        }
3120        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3121        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3122    }
3123
3124    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3125    /// invalid UTF8 data in a character other than the first in a string larger
3126    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3127        let valid: &[u8] = b"   ";
3128        let mut valid_long = vec![];
3129        for _ in 0..10 {
3130            // each instance is 38 bytes
3131            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3132        }
3133        let invalid = INVALID_UTF8_LATER_CHAR;
3134        GenericBinaryArray::<O>::from_iter(vec![
3135            None,
3136            Some(valid),
3137            Some(invalid),
3138            None,
3139            Some(&valid_long),
3140            Some(valid),
3141        ])
3142    }
3143
3144    /// writes the array into a single column parquet file with the specified
3145    /// encoding.
3146    ///
3147    /// If no encoding is specified, use default (dictionary) encoding
3148    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3149        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3150        let mut data = vec![];
3151        let schema = batch.schema();
3152        let props = encoding.map(|encoding| {
3153            WriterProperties::builder()
3154                // must disable dictionary encoding to actually use encoding
3155                .set_dictionary_enabled(false)
3156                .set_encoding(encoding)
3157                .build()
3158        });
3159
3160        {
3161            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3162            writer.write(&batch).unwrap();
3163            writer.flush().unwrap();
3164            writer.close().unwrap();
3165        };
3166        data
3167    }
3168
3169    /// read the parquet file into a record batch
3170    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3171        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3172            .unwrap()
3173            .build()
3174            .unwrap();
3175
3176        reader.collect()
3177    }
3178
3179    #[test]
3180    fn test_dictionary_preservation() {
3181        let fields = vec![Arc::new(
3182            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3183                .with_repetition(Repetition::OPTIONAL)
3184                .with_converted_type(ConvertedType::UTF8)
3185                .build()
3186                .unwrap(),
3187        )];
3188
3189        let schema = Arc::new(
3190            Type::group_type_builder("test_schema")
3191                .with_fields(fields)
3192                .build()
3193                .unwrap(),
3194        );
3195
3196        let dict_type = ArrowDataType::Dictionary(
3197            Box::new(ArrowDataType::Int32),
3198            Box::new(ArrowDataType::Utf8),
3199        );
3200
3201        let arrow_field = Field::new("leaf", dict_type, true);
3202
3203        let mut file = tempfile::tempfile().unwrap();
3204
3205        let values = vec![
3206            vec![
3207                ByteArray::from("hello"),
3208                ByteArray::from("a"),
3209                ByteArray::from("b"),
3210                ByteArray::from("d"),
3211            ],
3212            vec![
3213                ByteArray::from("c"),
3214                ByteArray::from("a"),
3215                ByteArray::from("b"),
3216            ],
3217        ];
3218
3219        let def_levels = vec![
3220            vec![1, 0, 0, 1, 0, 0, 1, 1],
3221            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3222        ];
3223
3224        let opts = TestOptions {
3225            encoding: Encoding::RLE_DICTIONARY,
3226            ..Default::default()
3227        };
3228
3229        generate_single_column_file_with_data::<ByteArrayType>(
3230            &values,
3231            Some(&def_levels),
3232            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3233            schema,
3234            Some(arrow_field),
3235            &opts,
3236        )
3237        .unwrap();
3238
3239        file.rewind().unwrap();
3240
3241        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3242
3243        let batches = record_reader
3244            .collect::<Result<Vec<RecordBatch>, _>>()
3245            .unwrap();
3246
3247        assert_eq!(batches.len(), 6);
3248        assert!(batches.iter().all(|x| x.num_columns() == 1));
3249
3250        let row_counts = batches
3251            .iter()
3252            .map(|x| (x.num_rows(), x.column(0).null_count()))
3253            .collect::<Vec<_>>();
3254
3255        assert_eq!(
3256            row_counts,
3257            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3258        );
3259
3260        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3261
3262        // First and second batch in same row group -> same dictionary
3263        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3264        // Third batch spans row group -> computed dictionary
3265        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3266        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3267        // Fourth, fifth and sixth from same row group -> same dictionary
3268        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3269        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3270    }
3271
3272    #[test]
3273    fn test_read_null_list() {
3274        let testdata = arrow::util::test_util::parquet_test_data();
3275        let path = format!("{testdata}/null_list.parquet");
3276        let file = File::open(path).unwrap();
3277        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3278
3279        let batch = record_batch_reader.next().unwrap().unwrap();
3280        assert_eq!(batch.num_rows(), 1);
3281        assert_eq!(batch.num_columns(), 1);
3282        assert_eq!(batch.column(0).len(), 1);
3283
3284        let list = batch
3285            .column(0)
3286            .as_any()
3287            .downcast_ref::<ListArray>()
3288            .unwrap();
3289        assert_eq!(list.len(), 1);
3290        assert!(list.is_valid(0));
3291
3292        let val = list.value(0);
3293        assert_eq!(val.len(), 0);
3294    }
3295
3296    #[test]
3297    fn test_null_schema_inference() {
3298        let testdata = arrow::util::test_util::parquet_test_data();
3299        let path = format!("{testdata}/null_list.parquet");
3300        let file = File::open(path).unwrap();
3301
3302        let arrow_field = Field::new(
3303            "emptylist",
3304            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3305            true,
3306        );
3307
3308        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3309        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3310        let schema = builder.schema();
3311        assert_eq!(schema.fields().len(), 1);
3312        assert_eq!(schema.field(0), &arrow_field);
3313    }
3314
3315    #[test]
3316    fn test_skip_metadata() {
3317        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3318        let field = Field::new("col", col.data_type().clone(), true);
3319
3320        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3321
3322        let metadata = [("key".to_string(), "value".to_string())]
3323            .into_iter()
3324            .collect();
3325
3326        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3327
3328        assert_ne!(schema_with_metadata, schema_without_metadata);
3329
3330        let batch =
3331            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3332
3333        let file = |version: WriterVersion| {
3334            let props = WriterProperties::builder()
3335                .set_writer_version(version)
3336                .build();
3337
3338            let file = tempfile().unwrap();
3339            let mut writer =
3340                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3341                    .unwrap();
3342            writer.write(&batch).unwrap();
3343            writer.close().unwrap();
3344            file
3345        };
3346
3347        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3348
3349        let v1_reader = file(WriterVersion::PARQUET_1_0);
3350        let v2_reader = file(WriterVersion::PARQUET_2_0);
3351
3352        let arrow_reader =
3353            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3354        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3355
3356        let reader =
3357            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3358                .unwrap()
3359                .build()
3360                .unwrap();
3361        assert_eq!(reader.schema(), schema_without_metadata);
3362
3363        let arrow_reader =
3364            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3365        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3366
3367        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3368            .unwrap()
3369            .build()
3370            .unwrap();
3371        assert_eq!(reader.schema(), schema_without_metadata);
3372    }
3373
3374    fn write_parquet_from_iter<I, F>(value: I) -> File
3375    where
3376        I: IntoIterator<Item = (F, ArrayRef)>,
3377        F: AsRef<str>,
3378    {
3379        let batch = RecordBatch::try_from_iter(value).unwrap();
3380        let file = tempfile().unwrap();
3381        let mut writer =
3382            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3383        writer.write(&batch).unwrap();
3384        writer.close().unwrap();
3385        file
3386    }
3387
3388    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3389    where
3390        I: IntoIterator<Item = (F, ArrayRef)>,
3391        F: AsRef<str>,
3392    {
3393        let file = write_parquet_from_iter(value);
3394        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3395        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3396            file.try_clone().unwrap(),
3397            options_with_schema,
3398        );
3399        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3400    }
3401
3402    #[test]
3403    fn test_schema_too_few_columns() {
3404        run_schema_test_with_error(
3405            vec![
3406                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3407                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3408            ],
3409            Arc::new(Schema::new(vec![Field::new(
3410                "int64",
3411                ArrowDataType::Int64,
3412                false,
3413            )])),
3414            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3415        );
3416    }
3417
3418    #[test]
3419    fn test_schema_too_many_columns() {
3420        run_schema_test_with_error(
3421            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3422            Arc::new(Schema::new(vec![
3423                Field::new("int64", ArrowDataType::Int64, false),
3424                Field::new("int32", ArrowDataType::Int32, false),
3425            ])),
3426            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3427        );
3428    }
3429
3430    #[test]
3431    fn test_schema_mismatched_column_names() {
3432        run_schema_test_with_error(
3433            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3434            Arc::new(Schema::new(vec![Field::new(
3435                "other",
3436                ArrowDataType::Int64,
3437                false,
3438            )])),
3439            "Arrow: incompatible arrow schema, expected field named int64 got other",
3440        );
3441    }
3442
3443    #[test]
3444    fn test_schema_incompatible_columns() {
3445        run_schema_test_with_error(
3446            vec![
3447                (
3448                    "col1_invalid",
3449                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3450                ),
3451                (
3452                    "col2_valid",
3453                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3454                ),
3455                (
3456                    "col3_invalid",
3457                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3458                ),
3459            ],
3460            Arc::new(Schema::new(vec![
3461                Field::new("col1_invalid", ArrowDataType::Int32, false),
3462                Field::new("col2_valid", ArrowDataType::Int32, false),
3463                Field::new("col3_invalid", ArrowDataType::Int32, false),
3464            ])),
3465            "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3466        );
3467    }
3468
3469    #[test]
3470    fn test_one_incompatible_nested_column() {
3471        let nested_fields = Fields::from(vec![
3472            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3473            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3474        ]);
3475        let nested = StructArray::try_new(
3476            nested_fields,
3477            vec![
3478                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3479                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3480            ],
3481            None,
3482        )
3483        .expect("struct array");
3484        let supplied_nested_fields = Fields::from(vec![
3485            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3486            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3487        ]);
3488        run_schema_test_with_error(
3489            vec![
3490                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3491                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3492                ("nested", Arc::new(nested) as ArrayRef),
3493            ],
3494            Arc::new(Schema::new(vec![
3495                Field::new("col1", ArrowDataType::Int64, false),
3496                Field::new("col2", ArrowDataType::Int32, false),
3497                Field::new(
3498                    "nested",
3499                    ArrowDataType::Struct(supplied_nested_fields),
3500                    false,
3501                ),
3502            ])),
3503            "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3504        );
3505    }
3506
3507    #[test]
3508    fn test_read_binary_as_utf8() {
3509        let file = write_parquet_from_iter(vec![
3510            (
3511                "binary_to_utf8",
3512                Arc::new(BinaryArray::from(vec![
3513                    b"one".as_ref(),
3514                    b"two".as_ref(),
3515                    b"three".as_ref(),
3516                ])) as ArrayRef,
3517            ),
3518            (
3519                "large_binary_to_large_utf8",
3520                Arc::new(LargeBinaryArray::from(vec![
3521                    b"one".as_ref(),
3522                    b"two".as_ref(),
3523                    b"three".as_ref(),
3524                ])) as ArrayRef,
3525            ),
3526            (
3527                "binary_view_to_utf8_view",
3528                Arc::new(BinaryViewArray::from(vec![
3529                    b"one".as_ref(),
3530                    b"two".as_ref(),
3531                    b"three".as_ref(),
3532                ])) as ArrayRef,
3533            ),
3534        ]);
3535        let supplied_fields = Fields::from(vec![
3536            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3537            Field::new(
3538                "large_binary_to_large_utf8",
3539                ArrowDataType::LargeUtf8,
3540                false,
3541            ),
3542            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3543        ]);
3544
3545        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3546        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3547            file.try_clone().unwrap(),
3548            options,
3549        )
3550        .expect("reader builder with schema")
3551        .build()
3552        .expect("reader with schema");
3553
3554        let batch = arrow_reader.next().unwrap().unwrap();
3555        assert_eq!(batch.num_columns(), 3);
3556        assert_eq!(batch.num_rows(), 3);
3557        assert_eq!(
3558            batch
3559                .column(0)
3560                .as_string::<i32>()
3561                .iter()
3562                .collect::<Vec<_>>(),
3563            vec![Some("one"), Some("two"), Some("three")]
3564        );
3565
3566        assert_eq!(
3567            batch
3568                .column(1)
3569                .as_string::<i64>()
3570                .iter()
3571                .collect::<Vec<_>>(),
3572            vec![Some("one"), Some("two"), Some("three")]
3573        );
3574
3575        assert_eq!(
3576            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3577            vec![Some("one"), Some("two"), Some("three")]
3578        );
3579    }
3580
3581    #[test]
3582    #[should_panic(expected = "Invalid UTF8 sequence at")]
3583    fn test_read_non_utf8_binary_as_utf8() {
3584        let file = write_parquet_from_iter(vec![(
3585            "non_utf8_binary",
3586            Arc::new(BinaryArray::from(vec![
3587                b"\xDE\x00\xFF".as_ref(),
3588                b"\xDE\x01\xAA".as_ref(),
3589                b"\xDE\x02\xFF".as_ref(),
3590            ])) as ArrayRef,
3591        )]);
3592        let supplied_fields = Fields::from(vec![Field::new(
3593            "non_utf8_binary",
3594            ArrowDataType::Utf8,
3595            false,
3596        )]);
3597
3598        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3599        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3600            file.try_clone().unwrap(),
3601            options,
3602        )
3603        .expect("reader builder with schema")
3604        .build()
3605        .expect("reader with schema");
3606        arrow_reader.next().unwrap().unwrap_err();
3607    }
3608
3609    #[test]
3610    fn test_with_schema() {
3611        let nested_fields = Fields::from(vec![
3612            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3613            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3614        ]);
3615
3616        let nested_arrays: Vec<ArrayRef> = vec![
3617            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3618            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3619        ];
3620
3621        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3622
3623        let file = write_parquet_from_iter(vec![
3624            (
3625                "int32_to_ts_second",
3626                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3627            ),
3628            (
3629                "date32_to_date64",
3630                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3631            ),
3632            ("nested", Arc::new(nested) as ArrayRef),
3633        ]);
3634
3635        let supplied_nested_fields = Fields::from(vec![
3636            Field::new(
3637                "utf8_to_dict",
3638                ArrowDataType::Dictionary(
3639                    Box::new(ArrowDataType::Int32),
3640                    Box::new(ArrowDataType::Utf8),
3641                ),
3642                false,
3643            ),
3644            Field::new(
3645                "int64_to_ts_nano",
3646                ArrowDataType::Timestamp(
3647                    arrow::datatypes::TimeUnit::Nanosecond,
3648                    Some("+10:00".into()),
3649                ),
3650                false,
3651            ),
3652        ]);
3653
3654        let supplied_schema = Arc::new(Schema::new(vec![
3655            Field::new(
3656                "int32_to_ts_second",
3657                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3658                false,
3659            ),
3660            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3661            Field::new(
3662                "nested",
3663                ArrowDataType::Struct(supplied_nested_fields),
3664                false,
3665            ),
3666        ]));
3667
3668        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3669        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3670            file.try_clone().unwrap(),
3671            options,
3672        )
3673        .expect("reader builder with schema")
3674        .build()
3675        .expect("reader with schema");
3676
3677        assert_eq!(arrow_reader.schema(), supplied_schema);
3678        let batch = arrow_reader.next().unwrap().unwrap();
3679        assert_eq!(batch.num_columns(), 3);
3680        assert_eq!(batch.num_rows(), 4);
3681        assert_eq!(
3682            batch
3683                .column(0)
3684                .as_any()
3685                .downcast_ref::<TimestampSecondArray>()
3686                .expect("downcast to timestamp second")
3687                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3688                .map(|v| v.to_string())
3689                .expect("value as datetime"),
3690            "1970-01-01 01:00:00 +01:00"
3691        );
3692        assert_eq!(
3693            batch
3694                .column(1)
3695                .as_any()
3696                .downcast_ref::<Date64Array>()
3697                .expect("downcast to date64")
3698                .value_as_date(0)
3699                .map(|v| v.to_string())
3700                .expect("value as date"),
3701            "1970-01-01"
3702        );
3703
3704        let nested = batch
3705            .column(2)
3706            .as_any()
3707            .downcast_ref::<StructArray>()
3708            .expect("downcast to struct");
3709
3710        let nested_dict = nested
3711            .column(0)
3712            .as_any()
3713            .downcast_ref::<Int32DictionaryArray>()
3714            .expect("downcast to dictionary");
3715
3716        assert_eq!(
3717            nested_dict
3718                .values()
3719                .as_any()
3720                .downcast_ref::<StringArray>()
3721                .expect("downcast to string")
3722                .iter()
3723                .collect::<Vec<_>>(),
3724            vec![Some("a"), Some("b")]
3725        );
3726
3727        assert_eq!(
3728            nested_dict.keys().iter().collect::<Vec<_>>(),
3729            vec![Some(0), Some(0), Some(0), Some(1)]
3730        );
3731
3732        assert_eq!(
3733            nested
3734                .column(1)
3735                .as_any()
3736                .downcast_ref::<TimestampNanosecondArray>()
3737                .expect("downcast to timestamp nanosecond")
3738                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3739                .map(|v| v.to_string())
3740                .expect("value as datetime"),
3741            "1970-01-01 10:00:00.000000001 +10:00"
3742        );
3743    }
3744
3745    #[test]
3746    fn test_empty_projection() {
3747        let testdata = arrow::util::test_util::parquet_test_data();
3748        let path = format!("{testdata}/alltypes_plain.parquet");
3749        let file = File::open(path).unwrap();
3750
3751        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3752        let file_metadata = builder.metadata().file_metadata();
3753        let expected_rows = file_metadata.num_rows() as usize;
3754
3755        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3756        let batch_reader = builder
3757            .with_projection(mask)
3758            .with_batch_size(2)
3759            .build()
3760            .unwrap();
3761
3762        let mut total_rows = 0;
3763        for maybe_batch in batch_reader {
3764            let batch = maybe_batch.unwrap();
3765            total_rows += batch.num_rows();
3766            assert_eq!(batch.num_columns(), 0);
3767            assert!(batch.num_rows() <= 2);
3768        }
3769
3770        assert_eq!(total_rows, expected_rows);
3771    }
3772
3773    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3774        let schema = Arc::new(Schema::new(vec![Field::new(
3775            "list",
3776            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3777            true,
3778        )]));
3779
3780        let mut buf = Vec::with_capacity(1024);
3781
3782        let mut writer = ArrowWriter::try_new(
3783            &mut buf,
3784            schema.clone(),
3785            Some(
3786                WriterProperties::builder()
3787                    .set_max_row_group_size(row_group_size)
3788                    .build(),
3789            ),
3790        )
3791        .unwrap();
3792        for _ in 0..2 {
3793            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3794            for _ in 0..(batch_size) {
3795                list_builder.append(true);
3796            }
3797            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3798                .unwrap();
3799            writer.write(&batch).unwrap();
3800        }
3801        writer.close().unwrap();
3802
3803        let mut record_reader =
3804            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3805        assert_eq!(
3806            batch_size,
3807            record_reader.next().unwrap().unwrap().num_rows()
3808        );
3809        assert_eq!(
3810            batch_size,
3811            record_reader.next().unwrap().unwrap().num_rows()
3812        );
3813    }
3814
3815    #[test]
3816    fn test_row_group_exact_multiple() {
3817        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3818        test_row_group_batch(8, 8);
3819        test_row_group_batch(10, 8);
3820        test_row_group_batch(8, 10);
3821        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3822        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3823        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3824        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3825        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3826    }
3827
3828    /// Given a RecordBatch containing all the column data, return the expected batches given
3829    /// a `batch_size` and `selection`
3830    fn get_expected_batches(
3831        column: &RecordBatch,
3832        selection: &RowSelection,
3833        batch_size: usize,
3834    ) -> Vec<RecordBatch> {
3835        let mut expected_batches = vec![];
3836
3837        let mut selection: VecDeque<_> = selection.clone().into();
3838        let mut row_offset = 0;
3839        let mut last_start = None;
3840        while row_offset < column.num_rows() && !selection.is_empty() {
3841            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3842            while batch_remaining > 0 && !selection.is_empty() {
3843                let (to_read, skip) = match selection.front_mut() {
3844                    Some(selection) if selection.row_count > batch_remaining => {
3845                        selection.row_count -= batch_remaining;
3846                        (batch_remaining, selection.skip)
3847                    }
3848                    Some(_) => {
3849                        let select = selection.pop_front().unwrap();
3850                        (select.row_count, select.skip)
3851                    }
3852                    None => break,
3853                };
3854
3855                batch_remaining -= to_read;
3856
3857                match skip {
3858                    true => {
3859                        if let Some(last_start) = last_start.take() {
3860                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3861                        }
3862                        row_offset += to_read
3863                    }
3864                    false => {
3865                        last_start.get_or_insert(row_offset);
3866                        row_offset += to_read
3867                    }
3868                }
3869            }
3870        }
3871
3872        if let Some(last_start) = last_start.take() {
3873            expected_batches.push(column.slice(last_start, row_offset - last_start))
3874        }
3875
3876        // Sanity check, all batches except the final should be the batch size
3877        for batch in &expected_batches[..expected_batches.len() - 1] {
3878            assert_eq!(batch.num_rows(), batch_size);
3879        }
3880
3881        expected_batches
3882    }
3883
3884    fn create_test_selection(
3885        step_len: usize,
3886        total_len: usize,
3887        skip_first: bool,
3888    ) -> (RowSelection, usize) {
3889        let mut remaining = total_len;
3890        let mut skip = skip_first;
3891        let mut vec = vec![];
3892        let mut selected_count = 0;
3893        while remaining != 0 {
3894            let step = if remaining > step_len {
3895                step_len
3896            } else {
3897                remaining
3898            };
3899            vec.push(RowSelector {
3900                row_count: step,
3901                skip,
3902            });
3903            remaining -= step;
3904            if !skip {
3905                selected_count += step;
3906            }
3907            skip = !skip;
3908        }
3909        (vec.into(), selected_count)
3910    }
3911
3912    #[test]
3913    fn test_scan_row_with_selection() {
3914        let testdata = arrow::util::test_util::parquet_test_data();
3915        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3916        let test_file = File::open(&path).unwrap();
3917
3918        let mut serial_reader =
3919            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3920        let data = serial_reader.next().unwrap().unwrap();
3921
3922        let do_test = |batch_size: usize, selection_len: usize| {
3923            for skip_first in [false, true] {
3924                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3925
3926                let expected = get_expected_batches(&data, &selections, batch_size);
3927                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3928                assert_eq!(
3929                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3930                    expected,
3931                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3932                );
3933            }
3934        };
3935
3936        // total row count 7300
3937        // 1. test selection len more than one page row count
3938        do_test(1000, 1000);
3939
3940        // 2. test selection len less than one page row count
3941        do_test(20, 20);
3942
3943        // 3. test selection_len less than batch_size
3944        do_test(20, 5);
3945
3946        // 4. test selection_len more than batch_size
3947        // If batch_size < selection_len
3948        do_test(20, 5);
3949
3950        fn create_skip_reader(
3951            test_file: &File,
3952            batch_size: usize,
3953            selections: RowSelection,
3954        ) -> ParquetRecordBatchReader {
3955            let options = ArrowReaderOptions::new().with_page_index(true);
3956            let file = test_file.try_clone().unwrap();
3957            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3958                .unwrap()
3959                .with_batch_size(batch_size)
3960                .with_row_selection(selections)
3961                .build()
3962                .unwrap()
3963        }
3964    }
3965
3966    #[test]
3967    fn test_batch_size_overallocate() {
3968        let testdata = arrow::util::test_util::parquet_test_data();
3969        // `alltypes_plain.parquet` only have 8 rows
3970        let path = format!("{testdata}/alltypes_plain.parquet");
3971        let test_file = File::open(path).unwrap();
3972
3973        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3974        let num_rows = builder.metadata.file_metadata().num_rows();
3975        let reader = builder
3976            .with_batch_size(1024)
3977            .with_projection(ProjectionMask::all())
3978            .build()
3979            .unwrap();
3980        assert_ne!(1024, num_rows);
3981        assert_eq!(reader.batch_size, num_rows as usize);
3982    }
3983
3984    #[test]
3985    fn test_read_with_page_index_enabled() {
3986        let testdata = arrow::util::test_util::parquet_test_data();
3987
3988        {
3989            // `alltypes_tiny_pages.parquet` has page index
3990            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3991            let test_file = File::open(path).unwrap();
3992            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3993                test_file,
3994                ArrowReaderOptions::new().with_page_index(true),
3995            )
3996            .unwrap();
3997            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3998            let reader = builder.build().unwrap();
3999            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4000            assert_eq!(batches.len(), 8);
4001        }
4002
4003        {
4004            // `alltypes_plain.parquet` doesn't have page index
4005            let path = format!("{testdata}/alltypes_plain.parquet");
4006            let test_file = File::open(path).unwrap();
4007            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4008                test_file,
4009                ArrowReaderOptions::new().with_page_index(true),
4010            )
4011            .unwrap();
4012            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4013            // we should read the file successfully.
4014            assert!(builder.metadata().offset_index().is_none());
4015            let reader = builder.build().unwrap();
4016            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4017            assert_eq!(batches.len(), 1);
4018        }
4019    }
4020
4021    #[test]
4022    fn test_raw_repetition() {
4023        const MESSAGE_TYPE: &str = "
4024            message Log {
4025              OPTIONAL INT32 eventType;
4026              REPEATED INT32 category;
4027              REPEATED group filter {
4028                OPTIONAL INT32 error;
4029              }
4030            }
4031        ";
4032        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4033        let props = Default::default();
4034
4035        let mut buf = Vec::with_capacity(1024);
4036        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4037        let mut row_group_writer = writer.next_row_group().unwrap();
4038
4039        // column 0
4040        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4041        col_writer
4042            .typed::<Int32Type>()
4043            .write_batch(&[1], Some(&[1]), None)
4044            .unwrap();
4045        col_writer.close().unwrap();
4046        // column 1
4047        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4048        col_writer
4049            .typed::<Int32Type>()
4050            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4051            .unwrap();
4052        col_writer.close().unwrap();
4053        // column 2
4054        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4055        col_writer
4056            .typed::<Int32Type>()
4057            .write_batch(&[1], Some(&[1]), Some(&[0]))
4058            .unwrap();
4059        col_writer.close().unwrap();
4060
4061        let rg_md = row_group_writer.close().unwrap();
4062        assert_eq!(rg_md.num_rows(), 1);
4063        writer.close().unwrap();
4064
4065        let bytes = Bytes::from(buf);
4066
4067        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4068        let full = no_mask.next().unwrap().unwrap();
4069
4070        assert_eq!(full.num_columns(), 3);
4071
4072        for idx in 0..3 {
4073            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4074            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4075            let mut reader = b.with_projection(mask).build().unwrap();
4076            let projected = reader.next().unwrap().unwrap();
4077
4078            assert_eq!(projected.num_columns(), 1);
4079            assert_eq!(full.column(idx), projected.column(0));
4080        }
4081    }
4082
4083    #[test]
4084    fn test_read_lz4_raw() {
4085        let testdata = arrow::util::test_util::parquet_test_data();
4086        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4087        let file = File::open(path).unwrap();
4088
4089        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4090            .unwrap()
4091            .collect::<Result<Vec<_>, _>>()
4092            .unwrap();
4093        assert_eq!(batches.len(), 1);
4094        let batch = &batches[0];
4095
4096        assert_eq!(batch.num_columns(), 3);
4097        assert_eq!(batch.num_rows(), 4);
4098
4099        // https://github.com/apache/parquet-testing/pull/18
4100        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4101        assert_eq!(
4102            a.values(),
4103            &[1593604800, 1593604800, 1593604801, 1593604801]
4104        );
4105
4106        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4107        let a: Vec<_> = a.iter().flatten().collect();
4108        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4109
4110        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4111        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4112    }
4113
4114    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4115    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4116    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4117    //    LZ4_HADOOP algorithm for compression.
4118    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4119    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4120    //    older parquet-cpp versions.
4121    //
4122    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4123    #[test]
4124    fn test_read_lz4_hadoop_fallback() {
4125        for file in [
4126            "hadoop_lz4_compressed.parquet",
4127            "non_hadoop_lz4_compressed.parquet",
4128        ] {
4129            let testdata = arrow::util::test_util::parquet_test_data();
4130            let path = format!("{testdata}/{file}");
4131            let file = File::open(path).unwrap();
4132            let expected_rows = 4;
4133
4134            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4135                .unwrap()
4136                .collect::<Result<Vec<_>, _>>()
4137                .unwrap();
4138            assert_eq!(batches.len(), 1);
4139            let batch = &batches[0];
4140
4141            assert_eq!(batch.num_columns(), 3);
4142            assert_eq!(batch.num_rows(), expected_rows);
4143
4144            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4145            assert_eq!(
4146                a.values(),
4147                &[1593604800, 1593604800, 1593604801, 1593604801]
4148            );
4149
4150            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4151            let b: Vec<_> = b.iter().flatten().collect();
4152            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4153
4154            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4155            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4156        }
4157    }
4158
4159    #[test]
4160    fn test_read_lz4_hadoop_large() {
4161        let testdata = arrow::util::test_util::parquet_test_data();
4162        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4163        let file = File::open(path).unwrap();
4164        let expected_rows = 10000;
4165
4166        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4167            .unwrap()
4168            .collect::<Result<Vec<_>, _>>()
4169            .unwrap();
4170        assert_eq!(batches.len(), 1);
4171        let batch = &batches[0];
4172
4173        assert_eq!(batch.num_columns(), 1);
4174        assert_eq!(batch.num_rows(), expected_rows);
4175
4176        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4177        let a: Vec<_> = a.iter().flatten().collect();
4178        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4179        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4180        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4181        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4182    }
4183
4184    #[test]
4185    #[cfg(feature = "snap")]
4186    fn test_read_nested_lists() {
4187        let testdata = arrow::util::test_util::parquet_test_data();
4188        let path = format!("{testdata}/nested_lists.snappy.parquet");
4189        let file = File::open(path).unwrap();
4190
4191        let f = file.try_clone().unwrap();
4192        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4193        let expected = reader.next().unwrap().unwrap();
4194        assert_eq!(expected.num_rows(), 3);
4195
4196        let selection = RowSelection::from(vec![
4197            RowSelector::skip(1),
4198            RowSelector::select(1),
4199            RowSelector::skip(1),
4200        ]);
4201        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4202            .unwrap()
4203            .with_row_selection(selection)
4204            .build()
4205            .unwrap();
4206
4207        let actual = reader.next().unwrap().unwrap();
4208        assert_eq!(actual.num_rows(), 1);
4209        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4210    }
4211
4212    #[test]
4213    fn test_arbitrary_decimal() {
4214        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4215        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4216            .with_precision_and_scale(19, 0)
4217            .unwrap();
4218        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4219            .with_precision_and_scale(12, 0)
4220            .unwrap();
4221        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4222            .with_precision_and_scale(17, 10)
4223            .unwrap();
4224
4225        let written = RecordBatch::try_from_iter([
4226            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4227            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4228            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4229        ])
4230        .unwrap();
4231
4232        let mut buffer = Vec::with_capacity(1024);
4233        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4234        writer.write(&written).unwrap();
4235        writer.close().unwrap();
4236
4237        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4238            .unwrap()
4239            .collect::<Result<Vec<_>, _>>()
4240            .unwrap();
4241
4242        assert_eq!(&written.slice(0, 8), &read[0]);
4243    }
4244
4245    #[test]
4246    fn test_list_skip() {
4247        let mut list = ListBuilder::new(Int32Builder::new());
4248        list.append_value([Some(1), Some(2)]);
4249        list.append_value([Some(3)]);
4250        list.append_value([Some(4)]);
4251        let list = list.finish();
4252        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4253
4254        // First page contains 2 values but only 1 row
4255        let props = WriterProperties::builder()
4256            .set_data_page_row_count_limit(1)
4257            .set_write_batch_size(2)
4258            .build();
4259
4260        let mut buffer = Vec::with_capacity(1024);
4261        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4262        writer.write(&batch).unwrap();
4263        writer.close().unwrap();
4264
4265        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4266        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4267            .unwrap()
4268            .with_row_selection(selection.into())
4269            .build()
4270            .unwrap();
4271        let out = reader.next().unwrap().unwrap();
4272        assert_eq!(out.num_rows(), 1);
4273        assert_eq!(out, batch.slice(2, 1));
4274    }
4275
4276    fn test_decimal_roundtrip<T: DecimalType>() {
4277        // Precision <= 9 -> INT32
4278        // Precision <= 18 -> INT64
4279        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4280
4281        let d = |values: Vec<usize>, p: u8| {
4282            let iter = values.into_iter().map(T::Native::usize_as);
4283            PrimitiveArray::<T>::from_iter_values(iter)
4284                .with_precision_and_scale(p, 2)
4285                .unwrap()
4286        };
4287
4288        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4289        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4290        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4291        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4292
4293        let batch = RecordBatch::try_from_iter([
4294            ("d1", Arc::new(d1) as ArrayRef),
4295            ("d2", Arc::new(d2) as ArrayRef),
4296            ("d3", Arc::new(d3) as ArrayRef),
4297            ("d4", Arc::new(d4) as ArrayRef),
4298        ])
4299        .unwrap();
4300
4301        let mut buffer = Vec::with_capacity(1024);
4302        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4303        writer.write(&batch).unwrap();
4304        writer.close().unwrap();
4305
4306        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4307        let t1 = builder.parquet_schema().columns()[0].physical_type();
4308        assert_eq!(t1, PhysicalType::INT32);
4309        let t2 = builder.parquet_schema().columns()[1].physical_type();
4310        assert_eq!(t2, PhysicalType::INT64);
4311        let t3 = builder.parquet_schema().columns()[2].physical_type();
4312        assert_eq!(t3, PhysicalType::INT64);
4313        let t4 = builder.parquet_schema().columns()[3].physical_type();
4314        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4315
4316        let mut reader = builder.build().unwrap();
4317        assert_eq!(batch.schema(), reader.schema());
4318
4319        let out = reader.next().unwrap().unwrap();
4320        assert_eq!(batch, out);
4321    }
4322
4323    #[test]
4324    fn test_decimal() {
4325        test_decimal_roundtrip::<Decimal128Type>();
4326        test_decimal_roundtrip::<Decimal256Type>();
4327    }
4328
4329    #[test]
4330    fn test_list_selection() {
4331        let schema = Arc::new(Schema::new(vec![Field::new_list(
4332            "list",
4333            Field::new_list_field(ArrowDataType::Utf8, true),
4334            false,
4335        )]));
4336        let mut buf = Vec::with_capacity(1024);
4337
4338        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4339
4340        for i in 0..2 {
4341            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4342            for j in 0..1024 {
4343                list_a_builder.values().append_value(format!("{i} {j}"));
4344                list_a_builder.append(true);
4345            }
4346            let batch =
4347                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4348                    .unwrap();
4349            writer.write(&batch).unwrap();
4350        }
4351        let _metadata = writer.close().unwrap();
4352
4353        let buf = Bytes::from(buf);
4354        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4355            .unwrap()
4356            .with_row_selection(RowSelection::from(vec![
4357                RowSelector::skip(100),
4358                RowSelector::select(924),
4359                RowSelector::skip(100),
4360                RowSelector::select(924),
4361            ]))
4362            .build()
4363            .unwrap();
4364
4365        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4366        let batch = concat_batches(&schema, &batches).unwrap();
4367
4368        assert_eq!(batch.num_rows(), 924 * 2);
4369        let list = batch.column(0).as_list::<i32>();
4370
4371        for w in list.value_offsets().windows(2) {
4372            assert_eq!(w[0] + 1, w[1])
4373        }
4374        let mut values = list.values().as_string::<i32>().iter();
4375
4376        for i in 0..2 {
4377            for j in 100..1024 {
4378                let expected = format!("{i} {j}");
4379                assert_eq!(values.next().unwrap().unwrap(), &expected);
4380            }
4381        }
4382    }
4383
4384    #[test]
4385    fn test_list_selection_fuzz() {
4386        let mut rng = rng();
4387        let schema = Arc::new(Schema::new(vec![Field::new_list(
4388            "list",
4389            Field::new_list(
4390                Field::LIST_FIELD_DEFAULT_NAME,
4391                Field::new_list_field(ArrowDataType::Int32, true),
4392                true,
4393            ),
4394            true,
4395        )]));
4396        let mut buf = Vec::with_capacity(1024);
4397        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4398
4399        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4400
4401        for _ in 0..2048 {
4402            if rng.random_bool(0.2) {
4403                list_a_builder.append(false);
4404                continue;
4405            }
4406
4407            let list_a_len = rng.random_range(0..10);
4408            let list_b_builder = list_a_builder.values();
4409
4410            for _ in 0..list_a_len {
4411                if rng.random_bool(0.2) {
4412                    list_b_builder.append(false);
4413                    continue;
4414                }
4415
4416                let list_b_len = rng.random_range(0..10);
4417                let int_builder = list_b_builder.values();
4418                for _ in 0..list_b_len {
4419                    match rng.random_bool(0.2) {
4420                        true => int_builder.append_null(),
4421                        false => int_builder.append_value(rng.random()),
4422                    }
4423                }
4424                list_b_builder.append(true)
4425            }
4426            list_a_builder.append(true);
4427        }
4428
4429        let array = Arc::new(list_a_builder.finish());
4430        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4431
4432        writer.write(&batch).unwrap();
4433        let _metadata = writer.close().unwrap();
4434
4435        let buf = Bytes::from(buf);
4436
4437        let cases = [
4438            vec![
4439                RowSelector::skip(100),
4440                RowSelector::select(924),
4441                RowSelector::skip(100),
4442                RowSelector::select(924),
4443            ],
4444            vec![
4445                RowSelector::select(924),
4446                RowSelector::skip(100),
4447                RowSelector::select(924),
4448                RowSelector::skip(100),
4449            ],
4450            vec![
4451                RowSelector::skip(1023),
4452                RowSelector::select(1),
4453                RowSelector::skip(1023),
4454                RowSelector::select(1),
4455            ],
4456            vec![
4457                RowSelector::select(1),
4458                RowSelector::skip(1023),
4459                RowSelector::select(1),
4460                RowSelector::skip(1023),
4461            ],
4462        ];
4463
4464        for batch_size in [100, 1024, 2048] {
4465            for selection in &cases {
4466                let selection = RowSelection::from(selection.clone());
4467                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4468                    .unwrap()
4469                    .with_row_selection(selection.clone())
4470                    .with_batch_size(batch_size)
4471                    .build()
4472                    .unwrap();
4473
4474                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4475                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4476                assert_eq!(actual.num_rows(), selection.row_count());
4477
4478                let mut batch_offset = 0;
4479                let mut actual_offset = 0;
4480                for selector in selection.iter() {
4481                    if selector.skip {
4482                        batch_offset += selector.row_count;
4483                        continue;
4484                    }
4485
4486                    assert_eq!(
4487                        batch.slice(batch_offset, selector.row_count),
4488                        actual.slice(actual_offset, selector.row_count)
4489                    );
4490
4491                    batch_offset += selector.row_count;
4492                    actual_offset += selector.row_count;
4493                }
4494            }
4495        }
4496    }
4497
4498    #[test]
4499    fn test_read_old_nested_list() {
4500        use arrow::datatypes::DataType;
4501        use arrow::datatypes::ToByteSlice;
4502
4503        let testdata = arrow::util::test_util::parquet_test_data();
4504        // message my_record {
4505        //     REQUIRED group a (LIST) {
4506        //         REPEATED group array (LIST) {
4507        //             REPEATED INT32 array;
4508        //         }
4509        //     }
4510        // }
4511        // should be read as list<list<int32>>
4512        let path = format!("{testdata}/old_list_structure.parquet");
4513        let test_file = File::open(path).unwrap();
4514
4515        // create expected ListArray
4516        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4517
4518        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4519        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4520
4521        // Construct a list array from the above two
4522        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4523            "array",
4524            DataType::Int32,
4525            false,
4526        ))))
4527        .len(2)
4528        .add_buffer(a_value_offsets)
4529        .add_child_data(a_values.into_data())
4530        .build()
4531        .unwrap();
4532        let a = ListArray::from(a_list_data);
4533
4534        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4535        let mut reader = builder.build().unwrap();
4536        let out = reader.next().unwrap().unwrap();
4537        assert_eq!(out.num_rows(), 1);
4538        assert_eq!(out.num_columns(), 1);
4539        // grab first column
4540        let c0 = out.column(0);
4541        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4542        // get first row: [[1, 2], [3, 4]]
4543        let r0 = c0arr.value(0);
4544        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4545        assert_eq!(r0arr, &a);
4546    }
4547
4548    #[test]
4549    fn test_map_no_value() {
4550        // File schema:
4551        // message schema {
4552        //   required group my_map (MAP) {
4553        //     repeated group key_value {
4554        //       required int32 key;
4555        //       optional int32 value;
4556        //     }
4557        //   }
4558        //   required group my_map_no_v (MAP) {
4559        //     repeated group key_value {
4560        //       required int32 key;
4561        //     }
4562        //   }
4563        //   required group my_list (LIST) {
4564        //     repeated group list {
4565        //       required int32 element;
4566        //     }
4567        //   }
4568        // }
4569        let testdata = arrow::util::test_util::parquet_test_data();
4570        let path = format!("{testdata}/map_no_value.parquet");
4571        let file = File::open(path).unwrap();
4572
4573        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4574            .unwrap()
4575            .build()
4576            .unwrap();
4577        let out = reader.next().unwrap().unwrap();
4578        assert_eq!(out.num_rows(), 3);
4579        assert_eq!(out.num_columns(), 3);
4580        // my_map_no_v and my_list columns should now be equivalent
4581        let c0 = out.column(1).as_list::<i32>();
4582        let c1 = out.column(2).as_list::<i32>();
4583        assert_eq!(c0.len(), c1.len());
4584        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4585    }
4586}