parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
42use crate::file::reader::{ChunkReader, SerializedPageReader};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54/// Builder for constructing Parquet readers that decode into [Apache Arrow]
55/// arrays.
56///
57/// Most users should use one of the following specializations:
58///
59/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
60/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61///
62/// # Features
63/// * Projection pushdown: [`Self::with_projection`]
64/// * Cached metadata: [`ArrowReaderMetadata::load`]
65/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
66/// * Row group filtering: [`Self::with_row_groups`]
67/// * Range filtering: [`Self::with_row_selection`]
68/// * Row level filtering: [`Self::with_row_filter`]
69///
70/// # Implementing Predicate Pushdown
71///
72/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
73/// process, which is efficient and allows the most low level optimizations.
74///
75/// However, most Parquet based systems will apply filters at many steps prior
76/// to decoding such as pruning files, row groups and data pages. This crate
77/// provides the low level APIs needed to implement such filtering, but does not
78/// include any logic to actually evaluate predicates. For example:
79///
80/// * [`Self::with_row_groups`] for Row Group pruning
81/// * [`Self::with_row_selection`] for data page pruning
82/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
83///
84/// The rationale for this design is that implementing predicate pushdown is a
85/// complex topic and varies significantly from system to system. For example
86///
87/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
88/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
89/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
90/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
91///
92/// You can read more about this design in the [Querying Parquet with
93/// Millisecond Latency] Arrow blog post.
94///
95/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
96/// [Apache Arrow]: https://arrow.apache.org/
97/// [`StatisticsConverter`]: statistics::StatisticsConverter
98/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
99pub struct ArrowReaderBuilder<T> {
100    pub(crate) input: T,
101
102    pub(crate) metadata: Arc<ParquetMetaData>,
103
104    pub(crate) schema: SchemaRef,
105
106    pub(crate) fields: Option<Arc<ParquetField>>,
107
108    pub(crate) batch_size: usize,
109
110    pub(crate) row_groups: Option<Vec<usize>>,
111
112    pub(crate) projection: ProjectionMask,
113
114    pub(crate) filter: Option<RowFilter>,
115
116    pub(crate) selection: Option<RowSelection>,
117
118    pub(crate) limit: Option<usize>,
119
120    pub(crate) offset: Option<usize>,
121
122    pub(crate) metrics: ArrowReaderMetrics,
123
124    pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("ArrowReaderBuilder<T>")
130            .field("input", &self.input)
131            .field("metadata", &self.metadata)
132            .field("schema", &self.schema)
133            .field("fields", &self.fields)
134            .field("batch_size", &self.batch_size)
135            .field("row_groups", &self.row_groups)
136            .field("projection", &self.projection)
137            .field("filter", &self.filter)
138            .field("selection", &self.selection)
139            .field("limit", &self.limit)
140            .field("offset", &self.offset)
141            .field("metrics", &self.metrics)
142            .finish()
143    }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148        Self {
149            input,
150            metadata: metadata.metadata,
151            schema: metadata.schema,
152            fields: metadata.fields,
153            batch_size: 1024,
154            row_groups: None,
155            projection: ProjectionMask::all(),
156            filter: None,
157            selection: None,
158            limit: None,
159            offset: None,
160            metrics: ArrowReaderMetrics::Disabled,
161            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
162        }
163    }
164
165    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
166    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167        &self.metadata
168    }
169
170    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
171    pub fn parquet_schema(&self) -> &SchemaDescriptor {
172        self.metadata.file_metadata().schema_descr()
173    }
174
175    /// Returns the arrow [`SchemaRef`] for this parquet file
176    pub fn schema(&self) -> &SchemaRef {
177        &self.schema
178    }
179
180    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
181    /// If the batch_size more than the file row count, use the file row count.
182    pub fn with_batch_size(self, batch_size: usize) -> Self {
183        // Try to avoid allocate large buffer
184        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185        Self { batch_size, ..self }
186    }
187
188    /// Only read data from the provided row group indexes
189    ///
190    /// This is also called row group filtering
191    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192        Self {
193            row_groups: Some(row_groups),
194            ..self
195        }
196    }
197
198    /// Only read data from the provided column indexes
199    pub fn with_projection(self, mask: ProjectionMask) -> Self {
200        Self {
201            projection: mask,
202            ..self
203        }
204    }
205
206    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
207    /// data into memory.
208    ///
209    /// This feature is used to restrict which rows are decoded within row
210    /// groups, skipping ranges of rows that are not needed. Such selections
211    /// could be determined by evaluating predicates against the parquet page
212    /// [`Index`] or some other external information available to a query
213    /// engine.
214    ///
215    /// # Notes
216    ///
217    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
218    /// applying the row selection, and therefore rows from skipped row groups
219    /// should not be included in the [`RowSelection`] (see example below)
220    ///
221    /// It is recommended to enable writing the page index if using this
222    /// functionality, to allow more efficient skipping over data pages. See
223    /// [`ArrowReaderOptions::with_page_index`].
224    ///
225    /// # Example
226    ///
227    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
228    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
229    /// row group 3:
230    ///
231    /// ```text
232    ///   Row Group 0, 1000 rows (selected)
233    ///   Row Group 1, 1000 rows (skipped)
234    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
235    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
236    /// ```
237    ///
238    /// You could pass the following [`RowSelection`]:
239    ///
240    /// ```text
241    ///  Select 1000    (scan all rows in row group 0)
242    ///  Skip 50        (skip the first 50 rows in row group 2)
243    ///  Select 50      (scan rows 50-100 in row group 2)
244    ///  Skip 900       (skip the remaining rows in row group 2)
245    ///  Skip 200       (skip the first 200 rows in row group 3)
246    ///  Select 100     (scan rows 200-300 in row group 3)
247    ///  Skip 700       (skip the remaining rows in row group 3)
248    /// ```
249    /// Note there is no entry for the (entirely) skipped row group 1.
250    ///
251    /// Note you can represent the same selection with fewer entries. Instead of
252    ///
253    /// ```text
254    ///  Skip 900       (skip the remaining rows in row group 2)
255    ///  Skip 200       (skip the first 200 rows in row group 3)
256    /// ```
257    ///
258    /// you could use
259    ///
260    /// ```text
261    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
262    /// ```
263    ///
264    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
265    pub fn with_row_selection(self, selection: RowSelection) -> Self {
266        Self {
267            selection: Some(selection),
268            ..self
269        }
270    }
271
272    /// Provide a [`RowFilter`] to skip decoding rows
273    ///
274    /// Row filters are applied after row group selection and row selection
275    ///
276    /// It is recommended to enable reading the page index if using this functionality, to allow
277    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
278    pub fn with_row_filter(self, filter: RowFilter) -> Self {
279        Self {
280            filter: Some(filter),
281            ..self
282        }
283    }
284
285    /// Provide a limit to the number of rows to be read
286    ///
287    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
288    /// allowing it to limit the final set of rows decoded after any pushed down predicates
289    ///
290    /// It is recommended to enable reading the page index if using this functionality, to allow
291    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
292    pub fn with_limit(self, limit: usize) -> Self {
293        Self {
294            limit: Some(limit),
295            ..self
296        }
297    }
298
299    /// Provide an offset to skip over the given number of rows
300    ///
301    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
302    /// allowing it to skip rows after any pushed down predicates
303    ///
304    /// It is recommended to enable reading the page index if using this functionality, to allow
305    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
306    pub fn with_offset(self, offset: usize) -> Self {
307        Self {
308            offset: Some(offset),
309            ..self
310        }
311    }
312
313    /// Specify metrics collection during reading
314    ///
315    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
316    /// clone of the provided metrics to the builder.
317    ///
318    /// For example:
319    ///
320    /// ```rust
321    /// # use std::sync::Arc;
322    /// # use bytes::Bytes;
323    /// # use arrow_array::{Int32Array, RecordBatch};
324    /// # use arrow_schema::{DataType, Field, Schema};
325    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
326    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
327    /// # use parquet::arrow::ArrowWriter;
328    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
329    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
330    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
331    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
332    /// # writer.write(&batch).unwrap();
333    /// # writer.close().unwrap();
334    /// # let file = Bytes::from(file);
335    /// // Create metrics object to pass into the reader
336    /// let metrics = ArrowReaderMetrics::enabled();
337    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
338    ///   // Configure the builder to use the metrics by passing a clone
339    ///   .with_metrics(metrics.clone())
340    ///   // Build the reader
341    ///   .build().unwrap();
342    /// // .. read data from the reader ..
343    ///
344    /// // check the metrics
345    /// assert!(metrics.records_read_from_inner().is_some());
346    /// ```
347    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348        Self { metrics, ..self }
349    }
350
351    /// Set the maximum size (per row group) of the predicate cache in bytes for
352    /// the async decoder.
353    ///
354    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
355    /// unlimited cache size.
356    ///
357    /// This cache is used to store decoded arrays that are used in
358    /// predicate evaluation ([`Self::with_row_filter`]).
359    ///
360    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
361    /// [this ticket] for more details and alternatives.
362    ///
363    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
364    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
365    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366        Self {
367            max_predicate_cache_size,
368            ..self
369        }
370    }
371}
372
373/// Options that control how metadata is read for a parquet file
374///
375/// See [`ArrowReaderBuilder`] for how to configure how the column data
376/// is then read from the file, including projection and filter pushdown
377#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379    /// Should the reader strip any user defined metadata from the Arrow schema
380    skip_arrow_metadata: bool,
381    /// If provided used as the schema hint when determining the Arrow schema,
382    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
383    ///
384    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
385    supplied_schema: Option<SchemaRef>,
386    /// Policy for reading offset and column indexes.
387    pub(crate) page_index_policy: PageIndexPolicy,
388    /// If encryption is enabled, the file decryption properties can be provided
389    #[cfg(feature = "encryption")]
390    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394    /// Create a new [`ArrowReaderOptions`] with the default settings
395    pub fn new() -> Self {
396        Self::default()
397    }
398
399    /// Skip decoding the embedded arrow metadata (defaults to `false`)
400    ///
401    /// Parquet files generated by some writers may contain embedded arrow
402    /// schema and metadata.
403    /// This may not be correct or compatible with your system,
404    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
405    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406        Self {
407            skip_arrow_metadata,
408            ..self
409        }
410    }
411
412    /// Provide a schema hint to use when reading the Parquet file.
413    ///
414    /// If provided, this schema takes precedence over any arrow schema embedded
415    /// in the metadata (see the [`arrow`] documentation for more details).
416    ///
417    /// If the provided schema is not compatible with the data stored in the
418    /// parquet file schema, an error will be returned when constructing the
419    /// builder.
420    ///
421    /// This option is only required if you want to explicitly control the
422    /// conversion of Parquet types to Arrow types, such as casting a column to
423    /// a different type. For example, if you wanted to read an Int64 in
424    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
425    ///
426    /// [`arrow`]: crate::arrow
427    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
428    ///
429    /// # Notes
430    ///
431    /// The provided schema must have the same number of columns as the parquet schema and
432    /// the column names must be the same.
433    ///
434    /// # Example
435    /// ```
436    /// use std::io::Bytes;
437    /// use std::sync::Arc;
438    /// use tempfile::tempfile;
439    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
440    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
441    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
442    /// use parquet::arrow::ArrowWriter;
443    ///
444    /// // Write data - schema is inferred from the data to be Int32
445    /// let file = tempfile().unwrap();
446    /// let batch = RecordBatch::try_from_iter(vec![
447    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
448    /// ]).unwrap();
449    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
450    /// writer.write(&batch).unwrap();
451    /// writer.close().unwrap();
452    ///
453    /// // Read the file back.
454    /// // Supply a schema that interprets the Int32 column as a Timestamp.
455    /// let supplied_schema = Arc::new(Schema::new(vec![
456    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
457    /// ]));
458    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
459    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
460    ///     file.try_clone().unwrap(),
461    ///     options
462    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
463    ///
464    /// // Create the reader and read the data using the supplied schema.
465    /// let mut reader = builder.build().unwrap();
466    /// let _batch = reader.next().unwrap().unwrap();
467    /// ```
468    pub fn with_schema(self, schema: SchemaRef) -> Self {
469        Self {
470            supplied_schema: Some(schema),
471            skip_arrow_metadata: true,
472            ..self
473        }
474    }
475
476    /// Enable reading [`PageIndex`], if present (defaults to `false`)
477    ///
478    /// The `PageIndex` can be used to push down predicates to the parquet scan,
479    /// potentially eliminating unnecessary IO, by some query engines.
480    ///
481    /// If this is enabled, [`ParquetMetaData::column_index`] and
482    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
483    /// information is present in the file.
484    ///
485    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
486    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
487    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
488    pub fn with_page_index(self, page_index: bool) -> Self {
489        let page_index_policy = PageIndexPolicy::from(page_index);
490
491        Self {
492            page_index_policy,
493            ..self
494        }
495    }
496
497    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
498    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
499        Self {
500            page_index_policy: policy,
501            ..self
502        }
503    }
504
505    /// Provide the file decryption properties to use when reading encrypted parquet files.
506    ///
507    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
508    #[cfg(feature = "encryption")]
509    pub fn with_file_decryption_properties(
510        self,
511        file_decryption_properties: FileDecryptionProperties,
512    ) -> Self {
513        Self {
514            file_decryption_properties: Some(file_decryption_properties),
515            ..self
516        }
517    }
518
519    /// Retrieve the currently set page index behavior.
520    ///
521    /// This can be set via [`with_page_index`][Self::with_page_index].
522    pub fn page_index(&self) -> bool {
523        self.page_index_policy != PageIndexPolicy::Skip
524    }
525
526    /// Retrieve the currently set file decryption properties.
527    ///
528    /// This can be set via
529    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
530    #[cfg(feature = "encryption")]
531    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
532        self.file_decryption_properties.as_ref()
533    }
534}
535
536/// The metadata necessary to construct a [`ArrowReaderBuilder`]
537///
538/// Note this structure is cheaply clone-able as it consists of several arcs.
539///
540/// This structure allows
541///
542/// 1. Loading metadata for a file once and then using that same metadata to
543///    construct multiple separate readers, for example, to distribute readers
544///    across multiple threads
545///
546/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
547///    from the file each time a reader is constructed.
548///
549/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
550#[derive(Debug, Clone)]
551pub struct ArrowReaderMetadata {
552    /// The Parquet Metadata, if known aprior
553    pub(crate) metadata: Arc<ParquetMetaData>,
554    /// The Arrow Schema
555    pub(crate) schema: SchemaRef,
556
557    pub(crate) fields: Option<Arc<ParquetField>>,
558}
559
560impl ArrowReaderMetadata {
561    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
562    ///
563    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
564    /// example of how this can be used
565    ///
566    /// # Notes
567    ///
568    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
569    /// `Self::metadata` is missing the page index, this function will attempt
570    /// to load the page index by making an object store request.
571    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
572        let metadata =
573            ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
574        #[cfg(feature = "encryption")]
575        let metadata =
576            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
577        let metadata = metadata.parse_and_finish(reader)?;
578        Self::try_new(Arc::new(metadata), options)
579    }
580
581    /// Create a new [`ArrowReaderMetadata`]
582    ///
583    /// # Notes
584    ///
585    /// This function does not attempt to load the PageIndex if not present in the metadata.
586    /// See [`Self::load`] for more details.
587    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
588        match options.supplied_schema {
589            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
590            None => {
591                let kv_metadata = match options.skip_arrow_metadata {
592                    true => None,
593                    false => metadata.file_metadata().key_value_metadata(),
594                };
595
596                let (schema, fields) = parquet_to_arrow_schema_and_fields(
597                    metadata.file_metadata().schema_descr(),
598                    ProjectionMask::all(),
599                    kv_metadata,
600                )?;
601
602                Ok(Self {
603                    metadata,
604                    schema: Arc::new(schema),
605                    fields: fields.map(Arc::new),
606                })
607            }
608        }
609    }
610
611    fn with_supplied_schema(
612        metadata: Arc<ParquetMetaData>,
613        supplied_schema: SchemaRef,
614    ) -> Result<Self> {
615        let parquet_schema = metadata.file_metadata().schema_descr();
616        let field_levels = parquet_to_arrow_field_levels(
617            parquet_schema,
618            ProjectionMask::all(),
619            Some(supplied_schema.fields()),
620        )?;
621        let fields = field_levels.fields;
622        let inferred_len = fields.len();
623        let supplied_len = supplied_schema.fields().len();
624        // Ensure the supplied schema has the same number of columns as the parquet schema.
625        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
626        // different lengths, but we check here to be safe.
627        if inferred_len != supplied_len {
628            return Err(arrow_err!(format!(
629                "Incompatible supplied Arrow schema: expected {} columns received {}",
630                inferred_len, supplied_len
631            )));
632        }
633
634        let mut errors = Vec::new();
635
636        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
637
638        for (field1, field2) in field_iter {
639            if field1.data_type() != field2.data_type() {
640                errors.push(format!(
641                    "data type mismatch for field {}: requested {} but found {}",
642                    field1.name(),
643                    field1.data_type(),
644                    field2.data_type()
645                ));
646            }
647            if field1.is_nullable() != field2.is_nullable() {
648                errors.push(format!(
649                    "nullability mismatch for field {}: expected {:?} but found {:?}",
650                    field1.name(),
651                    field1.is_nullable(),
652                    field2.is_nullable()
653                ));
654            }
655            if field1.metadata() != field2.metadata() {
656                errors.push(format!(
657                    "metadata mismatch for field {}: expected {:?} but found {:?}",
658                    field1.name(),
659                    field1.metadata(),
660                    field2.metadata()
661                ));
662            }
663        }
664
665        if !errors.is_empty() {
666            let message = errors.join(", ");
667            return Err(ParquetError::ArrowError(format!(
668                "Incompatible supplied Arrow schema: {message}",
669            )));
670        }
671
672        Ok(Self {
673            metadata,
674            schema: supplied_schema,
675            fields: field_levels.levels.map(Arc::new),
676        })
677    }
678
679    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
680    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
681        &self.metadata
682    }
683
684    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
685    pub fn parquet_schema(&self) -> &SchemaDescriptor {
686        self.metadata.file_metadata().schema_descr()
687    }
688
689    /// Returns the arrow [`SchemaRef`] for this parquet file
690    pub fn schema(&self) -> &SchemaRef {
691        &self.schema
692    }
693}
694
695#[doc(hidden)]
696// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
697pub struct SyncReader<T: ChunkReader>(T);
698
699impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
700    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701        f.debug_tuple("SyncReader").field(&self.0).finish()
702    }
703}
704
705/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
706///
707/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
708///
709/// See [`ArrowReaderBuilder`] for additional member functions
710pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
711
712impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
713    /// Create a new [`ParquetRecordBatchReaderBuilder`]
714    ///
715    /// ```
716    /// # use std::sync::Arc;
717    /// # use bytes::Bytes;
718    /// # use arrow_array::{Int32Array, RecordBatch};
719    /// # use arrow_schema::{DataType, Field, Schema};
720    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
721    /// # use parquet::arrow::ArrowWriter;
722    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
723    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
724    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
725    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
726    /// # writer.write(&batch).unwrap();
727    /// # writer.close().unwrap();
728    /// # let file = Bytes::from(file);
729    /// #
730    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
731    ///
732    /// // Inspect metadata
733    /// assert_eq!(builder.metadata().num_row_groups(), 1);
734    ///
735    /// // Construct reader
736    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
737    ///
738    /// // Read data
739    /// let _batch = reader.next().unwrap().unwrap();
740    /// ```
741    pub fn try_new(reader: T) -> Result<Self> {
742        Self::try_new_with_options(reader, Default::default())
743    }
744
745    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
746    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
747        let metadata = ArrowReaderMetadata::load(&reader, options)?;
748        Ok(Self::new_with_metadata(reader, metadata))
749    }
750
751    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
752    ///
753    /// This interface allows:
754    ///
755    /// 1. Loading metadata once and using it to create multiple builders with
756    ///    potentially different settings or run on different threads
757    ///
758    /// 2. Using a cached copy of the metadata rather than re-reading it from the
759    ///    file each time a reader is constructed.
760    ///
761    /// See the docs on [`ArrowReaderMetadata`] for more details
762    ///
763    /// # Example
764    /// ```
765    /// # use std::fs::metadata;
766    /// # use std::sync::Arc;
767    /// # use bytes::Bytes;
768    /// # use arrow_array::{Int32Array, RecordBatch};
769    /// # use arrow_schema::{DataType, Field, Schema};
770    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
771    /// # use parquet::arrow::ArrowWriter;
772    /// #
773    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
774    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
775    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
776    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
777    /// # writer.write(&batch).unwrap();
778    /// # writer.close().unwrap();
779    /// # let file = Bytes::from(file);
780    /// #
781    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
782    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
783    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
784    ///
785    /// // Should be able to read from both in parallel
786    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
787    /// ```
788    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
789        Self::new_builder(SyncReader(input), metadata)
790    }
791
792    /// Read bloom filter for a column in a row group
793    ///
794    /// Returns `None` if the column does not have a bloom filter
795    ///
796    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
797    pub fn get_row_group_column_bloom_filter(
798        &self,
799        row_group_idx: usize,
800        column_idx: usize,
801    ) -> Result<Option<Sbbf>> {
802        let metadata = self.metadata.row_group(row_group_idx);
803        let column_metadata = metadata.column(column_idx);
804
805        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
806            offset
807                .try_into()
808                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
809        } else {
810            return Ok(None);
811        };
812
813        let buffer = match column_metadata.bloom_filter_length() {
814            Some(length) => self.input.0.get_bytes(offset, length as usize),
815            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
816        }?;
817
818        let (header, bitset_offset) =
819            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
820
821        match header.algorithm {
822            BloomFilterAlgorithm::BLOCK => {
823                // this match exists to future proof the singleton algorithm enum
824            }
825        }
826        match header.compression {
827            BloomFilterCompression::UNCOMPRESSED => {
828                // this match exists to future proof the singleton compression enum
829            }
830        }
831        match header.hash {
832            BloomFilterHash::XXHASH => {
833                // this match exists to future proof the singleton hash enum
834            }
835        }
836
837        let bitset = match column_metadata.bloom_filter_length() {
838            Some(_) => buffer.slice(
839                (TryInto::<usize>::try_into(bitset_offset).unwrap()
840                    - TryInto::<usize>::try_into(offset).unwrap())..,
841            ),
842            None => {
843                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
844                    ParquetError::General("Bloom filter length is invalid".to_string())
845                })?;
846                self.input.0.get_bytes(bitset_offset, bitset_length)?
847            }
848        };
849        Ok(Some(Sbbf::new(&bitset)))
850    }
851
852    /// Build a [`ParquetRecordBatchReader`]
853    ///
854    /// Note: this will eagerly evaluate any `RowFilter` before returning
855    pub fn build(self) -> Result<ParquetRecordBatchReader> {
856        let Self {
857            input,
858            metadata,
859            schema: _,
860            fields,
861            batch_size: _,
862            row_groups,
863            projection,
864            mut filter,
865            selection,
866            limit,
867            offset,
868            metrics,
869            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
870            max_predicate_cache_size: _,
871        } = self;
872
873        // Try to avoid allocate large buffer
874        let batch_size = self
875            .batch_size
876            .min(metadata.file_metadata().num_rows() as usize);
877
878        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
879
880        let reader = ReaderRowGroups {
881            reader: Arc::new(input.0),
882            metadata,
883            row_groups,
884        };
885
886        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
887
888        // Update selection based on any filters
889        if let Some(filter) = filter.as_mut() {
890            for predicate in filter.predicates.iter_mut() {
891                // break early if we have ruled out all rows
892                if !plan_builder.selects_any() {
893                    break;
894                }
895
896                let mut cache_projection = predicate.projection().clone();
897                cache_projection.intersect(&projection);
898
899                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
900                    .build_array_reader(fields.as_deref(), predicate.projection())?;
901
902                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
903            }
904        }
905
906        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
907            .build_array_reader(fields.as_deref(), &projection)?;
908
909        let read_plan = plan_builder
910            .limited(reader.num_rows())
911            .with_offset(offset)
912            .with_limit(limit)
913            .build_limited()
914            .build();
915
916        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
917    }
918}
919
920struct ReaderRowGroups<T: ChunkReader> {
921    reader: Arc<T>,
922
923    metadata: Arc<ParquetMetaData>,
924    /// Optional list of row group indices to scan
925    row_groups: Vec<usize>,
926}
927
928impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
929    fn num_rows(&self) -> usize {
930        let meta = self.metadata.row_groups();
931        self.row_groups
932            .iter()
933            .map(|x| meta[*x].num_rows() as usize)
934            .sum()
935    }
936
937    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
938        Ok(Box::new(ReaderPageIterator {
939            column_idx: i,
940            reader: self.reader.clone(),
941            metadata: self.metadata.clone(),
942            row_groups: self.row_groups.clone().into_iter(),
943        }))
944    }
945}
946
947struct ReaderPageIterator<T: ChunkReader> {
948    reader: Arc<T>,
949    column_idx: usize,
950    row_groups: std::vec::IntoIter<usize>,
951    metadata: Arc<ParquetMetaData>,
952}
953
954impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
955    /// Return the next SerializedPageReader
956    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
957        let rg = self.metadata.row_group(rg_idx);
958        let column_chunk_metadata = rg.column(self.column_idx);
959        let offset_index = self.metadata.offset_index();
960        // `offset_index` may not exist and `i[rg_idx]` will be empty.
961        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
962        let page_locations = offset_index
963            .filter(|i| !i[rg_idx].is_empty())
964            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
965        let total_rows = rg.num_rows() as usize;
966        let reader = self.reader.clone();
967
968        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
969            .add_crypto_context(
970                rg_idx,
971                self.column_idx,
972                self.metadata.as_ref(),
973                column_chunk_metadata,
974            )
975    }
976}
977
978impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
979    type Item = Result<Box<dyn PageReader>>;
980
981    fn next(&mut self) -> Option<Self::Item> {
982        let rg_idx = self.row_groups.next()?;
983        let page_reader = self
984            .next_page_reader(rg_idx)
985            .map(|page_reader| Box::new(page_reader) as _);
986        Some(page_reader)
987    }
988}
989
990impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
991
992/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
993/// read from a parquet data source
994pub struct ParquetRecordBatchReader {
995    array_reader: Box<dyn ArrayReader>,
996    schema: SchemaRef,
997    read_plan: ReadPlan,
998}
999
1000impl Iterator for ParquetRecordBatchReader {
1001    type Item = Result<RecordBatch, ArrowError>;
1002
1003    fn next(&mut self) -> Option<Self::Item> {
1004        self.next_inner()
1005            .map_err(|arrow_err| arrow_err.into())
1006            .transpose()
1007    }
1008}
1009
1010impl ParquetRecordBatchReader {
1011    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1012    /// has reached the end of the file.
1013    ///
1014    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1015    /// simplify error handling with `?`
1016    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1017        let mut read_records = 0;
1018        let batch_size = self.batch_size();
1019        match self.read_plan.selection_mut() {
1020            Some(selection) => {
1021                while read_records < batch_size && !selection.is_empty() {
1022                    let front = selection.pop_front().unwrap();
1023                    if front.skip {
1024                        let skipped = self.array_reader.skip_records(front.row_count)?;
1025
1026                        if skipped != front.row_count {
1027                            return Err(general_err!(
1028                                "failed to skip rows, expected {}, got {}",
1029                                front.row_count,
1030                                skipped
1031                            ));
1032                        }
1033                        continue;
1034                    }
1035
1036                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1037                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1038                    if front.row_count == 0 {
1039                        continue;
1040                    }
1041
1042                    // try to read record
1043                    let need_read = batch_size - read_records;
1044                    let to_read = match front.row_count.checked_sub(need_read) {
1045                        Some(remaining) if remaining != 0 => {
1046                            // if page row count less than batch_size we must set batch size to page row count.
1047                            // add check avoid dead loop
1048                            selection.push_front(RowSelector::select(remaining));
1049                            need_read
1050                        }
1051                        _ => front.row_count,
1052                    };
1053                    match self.array_reader.read_records(to_read)? {
1054                        0 => break,
1055                        rec => read_records += rec,
1056                    };
1057                }
1058            }
1059            None => {
1060                self.array_reader.read_records(batch_size)?;
1061            }
1062        };
1063
1064        let array = self.array_reader.consume_batch()?;
1065        let struct_array = array.as_struct_opt().ok_or_else(|| {
1066            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1067        })?;
1068
1069        Ok(if struct_array.len() > 0 {
1070            Some(RecordBatch::from(struct_array))
1071        } else {
1072            None
1073        })
1074    }
1075}
1076
1077impl RecordBatchReader for ParquetRecordBatchReader {
1078    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1079    ///
1080    /// Note that the schema metadata will be stripped here. See
1081    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1082    fn schema(&self) -> SchemaRef {
1083        self.schema.clone()
1084    }
1085}
1086
1087impl ParquetRecordBatchReader {
1088    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1089    ///
1090    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1091    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1092        ParquetRecordBatchReaderBuilder::try_new(reader)?
1093            .with_batch_size(batch_size)
1094            .build()
1095    }
1096
1097    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1098    ///
1099    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1100    /// higher-level interface for reading parquet data from a file
1101    pub fn try_new_with_row_groups(
1102        levels: &FieldLevels,
1103        row_groups: &dyn RowGroups,
1104        batch_size: usize,
1105        selection: Option<RowSelection>,
1106    ) -> Result<Self> {
1107        // note metrics are not supported in this API
1108        let metrics = ArrowReaderMetrics::disabled();
1109        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1110            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1111
1112        let read_plan = ReadPlanBuilder::new(batch_size)
1113            .with_selection(selection)
1114            .build();
1115
1116        Ok(Self {
1117            array_reader,
1118            schema: Arc::new(Schema::new(levels.fields.clone())),
1119            read_plan,
1120        })
1121    }
1122
1123    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1124    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1125    /// all rows will be returned
1126    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1127        let schema = match array_reader.get_data_type() {
1128            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1129            _ => unreachable!("Struct array reader's data type is not struct!"),
1130        };
1131
1132        Self {
1133            array_reader,
1134            schema: Arc::new(schema),
1135            read_plan,
1136        }
1137    }
1138
1139    #[inline(always)]
1140    pub(crate) fn batch_size(&self) -> usize {
1141        self.read_plan.batch_size()
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use std::cmp::min;
1148    use std::collections::{HashMap, VecDeque};
1149    use std::fmt::Formatter;
1150    use std::fs::File;
1151    use std::io::Seek;
1152    use std::path::PathBuf;
1153    use std::sync::Arc;
1154
1155    use arrow_array::builder::*;
1156    use arrow_array::cast::AsArray;
1157    use arrow_array::types::{
1158        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1159        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1160        Time64MicrosecondType,
1161    };
1162    use arrow_array::*;
1163    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, i256};
1164    use arrow_data::{ArrayData, ArrayDataBuilder};
1165    use arrow_schema::{
1166        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1167    };
1168    use arrow_select::concat::concat_batches;
1169    use bytes::Bytes;
1170    use half::f16;
1171    use num_traits::PrimInt;
1172    use rand::{Rng, RngCore, rng};
1173    use tempfile::tempfile;
1174
1175    use crate::arrow::arrow_reader::{
1176        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1177        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1178    };
1179    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1180    use crate::arrow::{ArrowWriter, ProjectionMask};
1181    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1182    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1183    use crate::data_type::{
1184        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1185        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1186    };
1187    use crate::errors::Result;
1188    use crate::file::metadata::ParquetMetaData;
1189    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1190    use crate::file::writer::SerializedFileWriter;
1191    use crate::schema::parser::parse_message_type;
1192    use crate::schema::types::{Type, TypePtr};
1193    use crate::util::test_common::rand_gen::RandGen;
1194
1195    #[test]
1196    fn test_arrow_reader_all_columns() {
1197        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1198
1199        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1200        let original_schema = Arc::clone(builder.schema());
1201        let reader = builder.build().unwrap();
1202
1203        // Verify that the schema was correctly parsed
1204        assert_eq!(original_schema.fields(), reader.schema().fields());
1205    }
1206
1207    #[test]
1208    fn test_arrow_reader_single_column() {
1209        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1210
1211        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1212        let original_schema = Arc::clone(builder.schema());
1213
1214        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1215        let reader = builder.with_projection(mask).build().unwrap();
1216
1217        // Verify that the schema was correctly parsed
1218        assert_eq!(1, reader.schema().fields().len());
1219        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1220    }
1221
1222    #[test]
1223    fn test_arrow_reader_single_column_by_name() {
1224        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1225
1226        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1227        let original_schema = Arc::clone(builder.schema());
1228
1229        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1230        let reader = builder.with_projection(mask).build().unwrap();
1231
1232        // Verify that the schema was correctly parsed
1233        assert_eq!(1, reader.schema().fields().len());
1234        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1235    }
1236
1237    #[test]
1238    fn test_null_column_reader_test() {
1239        let mut file = tempfile::tempfile().unwrap();
1240
1241        let schema = "
1242            message message {
1243                OPTIONAL INT32 int32;
1244            }
1245        ";
1246        let schema = Arc::new(parse_message_type(schema).unwrap());
1247
1248        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1249        generate_single_column_file_with_data::<Int32Type>(
1250            &[vec![], vec![]],
1251            Some(&def_levels),
1252            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1253            schema,
1254            Some(Field::new("int32", ArrowDataType::Null, true)),
1255            &Default::default(),
1256        )
1257        .unwrap();
1258
1259        file.rewind().unwrap();
1260
1261        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1262        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1263
1264        assert_eq!(batches.len(), 4);
1265        for batch in &batches[0..3] {
1266            assert_eq!(batch.num_rows(), 2);
1267            assert_eq!(batch.num_columns(), 1);
1268            assert_eq!(batch.column(0).null_count(), 2);
1269        }
1270
1271        assert_eq!(batches[3].num_rows(), 1);
1272        assert_eq!(batches[3].num_columns(), 1);
1273        assert_eq!(batches[3].column(0).null_count(), 1);
1274    }
1275
1276    #[test]
1277    fn test_primitive_single_column_reader_test() {
1278        run_single_column_reader_tests::<BoolType, _, BoolType>(
1279            2,
1280            ConvertedType::NONE,
1281            None,
1282            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1283            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1284        );
1285        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1286            2,
1287            ConvertedType::NONE,
1288            None,
1289            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1290            &[
1291                Encoding::PLAIN,
1292                Encoding::RLE_DICTIONARY,
1293                Encoding::DELTA_BINARY_PACKED,
1294                Encoding::BYTE_STREAM_SPLIT,
1295            ],
1296        );
1297        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1298            2,
1299            ConvertedType::NONE,
1300            None,
1301            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1302            &[
1303                Encoding::PLAIN,
1304                Encoding::RLE_DICTIONARY,
1305                Encoding::DELTA_BINARY_PACKED,
1306                Encoding::BYTE_STREAM_SPLIT,
1307            ],
1308        );
1309        run_single_column_reader_tests::<FloatType, _, FloatType>(
1310            2,
1311            ConvertedType::NONE,
1312            None,
1313            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1314            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1315        );
1316    }
1317
1318    #[test]
1319    fn test_unsigned_primitive_single_column_reader_test() {
1320        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1321            2,
1322            ConvertedType::UINT_32,
1323            Some(ArrowDataType::UInt32),
1324            |vals| {
1325                Arc::new(UInt32Array::from_iter(
1326                    vals.iter().map(|x| x.map(|x| x as u32)),
1327                ))
1328            },
1329            &[
1330                Encoding::PLAIN,
1331                Encoding::RLE_DICTIONARY,
1332                Encoding::DELTA_BINARY_PACKED,
1333            ],
1334        );
1335        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1336            2,
1337            ConvertedType::UINT_64,
1338            Some(ArrowDataType::UInt64),
1339            |vals| {
1340                Arc::new(UInt64Array::from_iter(
1341                    vals.iter().map(|x| x.map(|x| x as u64)),
1342                ))
1343            },
1344            &[
1345                Encoding::PLAIN,
1346                Encoding::RLE_DICTIONARY,
1347                Encoding::DELTA_BINARY_PACKED,
1348            ],
1349        );
1350    }
1351
1352    #[test]
1353    fn test_unsigned_roundtrip() {
1354        let schema = Arc::new(Schema::new(vec![
1355            Field::new("uint32", ArrowDataType::UInt32, true),
1356            Field::new("uint64", ArrowDataType::UInt64, true),
1357        ]));
1358
1359        let mut buf = Vec::with_capacity(1024);
1360        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1361
1362        let original = RecordBatch::try_new(
1363            schema,
1364            vec![
1365                Arc::new(UInt32Array::from_iter_values([
1366                    0,
1367                    i32::MAX as u32,
1368                    u32::MAX,
1369                ])),
1370                Arc::new(UInt64Array::from_iter_values([
1371                    0,
1372                    i64::MAX as u64,
1373                    u64::MAX,
1374                ])),
1375            ],
1376        )
1377        .unwrap();
1378
1379        writer.write(&original).unwrap();
1380        writer.close().unwrap();
1381
1382        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1383        let ret = reader.next().unwrap().unwrap();
1384        assert_eq!(ret, original);
1385
1386        // Check they can be downcast to the correct type
1387        ret.column(0)
1388            .as_any()
1389            .downcast_ref::<UInt32Array>()
1390            .unwrap();
1391
1392        ret.column(1)
1393            .as_any()
1394            .downcast_ref::<UInt64Array>()
1395            .unwrap();
1396    }
1397
1398    #[test]
1399    fn test_float16_roundtrip() -> Result<()> {
1400        let schema = Arc::new(Schema::new(vec![
1401            Field::new("float16", ArrowDataType::Float16, false),
1402            Field::new("float16-nullable", ArrowDataType::Float16, true),
1403        ]));
1404
1405        let mut buf = Vec::with_capacity(1024);
1406        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1407
1408        let original = RecordBatch::try_new(
1409            schema,
1410            vec![
1411                Arc::new(Float16Array::from_iter_values([
1412                    f16::EPSILON,
1413                    f16::MIN,
1414                    f16::MAX,
1415                    f16::NAN,
1416                    f16::INFINITY,
1417                    f16::NEG_INFINITY,
1418                    f16::ONE,
1419                    f16::NEG_ONE,
1420                    f16::ZERO,
1421                    f16::NEG_ZERO,
1422                    f16::E,
1423                    f16::PI,
1424                    f16::FRAC_1_PI,
1425                ])),
1426                Arc::new(Float16Array::from(vec![
1427                    None,
1428                    None,
1429                    None,
1430                    Some(f16::NAN),
1431                    Some(f16::INFINITY),
1432                    Some(f16::NEG_INFINITY),
1433                    None,
1434                    None,
1435                    None,
1436                    None,
1437                    None,
1438                    None,
1439                    Some(f16::FRAC_1_PI),
1440                ])),
1441            ],
1442        )?;
1443
1444        writer.write(&original)?;
1445        writer.close()?;
1446
1447        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1448        let ret = reader.next().unwrap()?;
1449        assert_eq!(ret, original);
1450
1451        // Ensure can be downcast to the correct type
1452        ret.column(0).as_primitive::<Float16Type>();
1453        ret.column(1).as_primitive::<Float16Type>();
1454
1455        Ok(())
1456    }
1457
1458    #[test]
1459    fn test_time_utc_roundtrip() -> Result<()> {
1460        let schema = Arc::new(Schema::new(vec![
1461            Field::new(
1462                "time_millis",
1463                ArrowDataType::Time32(TimeUnit::Millisecond),
1464                true,
1465            )
1466            .with_metadata(HashMap::from_iter(vec![(
1467                "adjusted_to_utc".to_string(),
1468                "".to_string(),
1469            )])),
1470            Field::new(
1471                "time_micros",
1472                ArrowDataType::Time64(TimeUnit::Microsecond),
1473                true,
1474            )
1475            .with_metadata(HashMap::from_iter(vec![(
1476                "adjusted_to_utc".to_string(),
1477                "".to_string(),
1478            )])),
1479        ]));
1480
1481        let mut buf = Vec::with_capacity(1024);
1482        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1483
1484        let original = RecordBatch::try_new(
1485            schema,
1486            vec![
1487                Arc::new(Time32MillisecondArray::from(vec![
1488                    Some(-1),
1489                    Some(0),
1490                    Some(86_399_000),
1491                    Some(86_400_000),
1492                    Some(86_401_000),
1493                    None,
1494                ])),
1495                Arc::new(Time64MicrosecondArray::from(vec![
1496                    Some(-1),
1497                    Some(0),
1498                    Some(86_399 * 1_000_000),
1499                    Some(86_400 * 1_000_000),
1500                    Some(86_401 * 1_000_000),
1501                    None,
1502                ])),
1503            ],
1504        )?;
1505
1506        writer.write(&original)?;
1507        writer.close()?;
1508
1509        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1510        let ret = reader.next().unwrap()?;
1511        assert_eq!(ret, original);
1512
1513        // Ensure can be downcast to the correct type
1514        ret.column(0).as_primitive::<Time32MillisecondType>();
1515        ret.column(1).as_primitive::<Time64MicrosecondType>();
1516
1517        Ok(())
1518    }
1519
1520    #[test]
1521    fn test_date32_roundtrip() -> Result<()> {
1522        use arrow_array::Date32Array;
1523
1524        let schema = Arc::new(Schema::new(vec![Field::new(
1525            "date32",
1526            ArrowDataType::Date32,
1527            false,
1528        )]));
1529
1530        let mut buf = Vec::with_capacity(1024);
1531
1532        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1533
1534        let original = RecordBatch::try_new(
1535            schema,
1536            vec![Arc::new(Date32Array::from(vec![
1537                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1538            ]))],
1539        )?;
1540
1541        writer.write(&original)?;
1542        writer.close()?;
1543
1544        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1545        let ret = reader.next().unwrap()?;
1546        assert_eq!(ret, original);
1547
1548        // Ensure can be downcast to the correct type
1549        ret.column(0).as_primitive::<Date32Type>();
1550
1551        Ok(())
1552    }
1553
1554    #[test]
1555    fn test_date64_roundtrip() -> Result<()> {
1556        use arrow_array::Date64Array;
1557
1558        let schema = Arc::new(Schema::new(vec![
1559            Field::new("small-date64", ArrowDataType::Date64, false),
1560            Field::new("big-date64", ArrowDataType::Date64, false),
1561            Field::new("invalid-date64", ArrowDataType::Date64, false),
1562        ]));
1563
1564        let mut default_buf = Vec::with_capacity(1024);
1565        let mut coerce_buf = Vec::with_capacity(1024);
1566
1567        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1568
1569        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1570        let mut coerce_writer =
1571            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1572
1573        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1574
1575        let original = RecordBatch::try_new(
1576            schema,
1577            vec![
1578                // small-date64
1579                Arc::new(Date64Array::from(vec![
1580                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1581                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1582                    0,
1583                    1_000 * NUM_MILLISECONDS_IN_DAY,
1584                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1585                ])),
1586                // big-date64
1587                Arc::new(Date64Array::from(vec![
1588                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1589                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1590                    0,
1591                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1592                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1593                ])),
1594                // invalid-date64
1595                Arc::new(Date64Array::from(vec![
1596                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1597                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1598                    1,
1599                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1600                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1601                ])),
1602            ],
1603        )?;
1604
1605        default_writer.write(&original)?;
1606        coerce_writer.write(&original)?;
1607
1608        default_writer.close()?;
1609        coerce_writer.close()?;
1610
1611        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1612        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1613
1614        let default_ret = default_reader.next().unwrap()?;
1615        let coerce_ret = coerce_reader.next().unwrap()?;
1616
1617        // Roundtrip should be successful when default writer used
1618        assert_eq!(default_ret, original);
1619
1620        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1621        assert_eq!(coerce_ret.column(0), original.column(0));
1622        assert_ne!(coerce_ret.column(1), original.column(1));
1623        assert_ne!(coerce_ret.column(2), original.column(2));
1624
1625        // Ensure both can be downcast to the correct type
1626        default_ret.column(0).as_primitive::<Date64Type>();
1627        coerce_ret.column(0).as_primitive::<Date64Type>();
1628
1629        Ok(())
1630    }
1631    struct RandFixedLenGen {}
1632
1633    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1634        fn r#gen(len: i32) -> FixedLenByteArray {
1635            let mut v = vec![0u8; len as usize];
1636            rng().fill_bytes(&mut v);
1637            ByteArray::from(v).into()
1638        }
1639    }
1640
1641    #[test]
1642    fn test_fixed_length_binary_column_reader() {
1643        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1644            20,
1645            ConvertedType::NONE,
1646            None,
1647            |vals| {
1648                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1649                for val in vals {
1650                    match val {
1651                        Some(b) => builder.append_value(b).unwrap(),
1652                        None => builder.append_null(),
1653                    }
1654                }
1655                Arc::new(builder.finish())
1656            },
1657            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1658        );
1659    }
1660
1661    #[test]
1662    fn test_interval_day_time_column_reader() {
1663        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1664            12,
1665            ConvertedType::INTERVAL,
1666            None,
1667            |vals| {
1668                Arc::new(
1669                    vals.iter()
1670                        .map(|x| {
1671                            x.as_ref().map(|b| IntervalDayTime {
1672                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1673                                milliseconds: i32::from_le_bytes(
1674                                    b.as_ref()[8..12].try_into().unwrap(),
1675                                ),
1676                            })
1677                        })
1678                        .collect::<IntervalDayTimeArray>(),
1679                )
1680            },
1681            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1682        );
1683    }
1684
1685    #[test]
1686    fn test_int96_single_column_reader_test() {
1687        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1688
1689        type TypeHintAndConversionFunction =
1690            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1691
1692        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1693            // Test without a specified ArrowType hint.
1694            (None, |vals: &[Option<Int96>]| {
1695                Arc::new(TimestampNanosecondArray::from_iter(
1696                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1697                )) as ArrayRef
1698            }),
1699            // Test other TimeUnits as ArrowType hints.
1700            (
1701                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1702                |vals: &[Option<Int96>]| {
1703                    Arc::new(TimestampSecondArray::from_iter(
1704                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1705                    )) as ArrayRef
1706                },
1707            ),
1708            (
1709                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1710                |vals: &[Option<Int96>]| {
1711                    Arc::new(TimestampMillisecondArray::from_iter(
1712                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1713                    )) as ArrayRef
1714                },
1715            ),
1716            (
1717                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1718                |vals: &[Option<Int96>]| {
1719                    Arc::new(TimestampMicrosecondArray::from_iter(
1720                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1721                    )) as ArrayRef
1722                },
1723            ),
1724            (
1725                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1726                |vals: &[Option<Int96>]| {
1727                    Arc::new(TimestampNanosecondArray::from_iter(
1728                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1729                    )) as ArrayRef
1730                },
1731            ),
1732            // Test another timezone with TimeUnit as ArrowType hints.
1733            (
1734                Some(ArrowDataType::Timestamp(
1735                    TimeUnit::Second,
1736                    Some(Arc::from("-05:00")),
1737                )),
1738                |vals: &[Option<Int96>]| {
1739                    Arc::new(
1740                        TimestampSecondArray::from_iter(
1741                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1742                        )
1743                        .with_timezone("-05:00"),
1744                    ) as ArrayRef
1745                },
1746            ),
1747        ];
1748
1749        resolutions.iter().for_each(|(arrow_type, converter)| {
1750            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1751                2,
1752                ConvertedType::NONE,
1753                arrow_type.clone(),
1754                converter,
1755                encodings,
1756            );
1757        })
1758    }
1759
1760    #[test]
1761    fn test_int96_from_spark_file_with_provided_schema() {
1762        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1763        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1764        // microsecond resolution for the Parquet reader to interpret these values correctly.
1765        use arrow_schema::DataType::Timestamp;
1766        let test_data = arrow::util::test_util::parquet_test_data();
1767        let path = format!("{test_data}/int96_from_spark.parquet");
1768        let file = File::open(path).unwrap();
1769
1770        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1771            "a",
1772            Timestamp(TimeUnit::Microsecond, None),
1773            true,
1774        )]));
1775        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1776
1777        let mut record_reader =
1778            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1779                .unwrap()
1780                .build()
1781                .unwrap();
1782
1783        let batch = record_reader.next().unwrap().unwrap();
1784        assert_eq!(batch.num_columns(), 1);
1785        let column = batch.column(0);
1786        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1787
1788        let expected = Arc::new(Int64Array::from(vec![
1789            Some(1704141296123456),
1790            Some(1704070800000000),
1791            Some(253402225200000000),
1792            Some(1735599600000000),
1793            None,
1794            Some(9089380393200000000),
1795        ]));
1796
1797        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1798        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1799        // anyway, so this should be a zero-copy non-modifying cast.
1800
1801        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1802        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1803
1804        assert_eq!(casted_timestamps.len(), expected.len());
1805
1806        casted_timestamps
1807            .iter()
1808            .zip(expected.iter())
1809            .for_each(|(lhs, rhs)| {
1810                assert_eq!(lhs, rhs);
1811            });
1812    }
1813
1814    #[test]
1815    fn test_int96_from_spark_file_without_provided_schema() {
1816        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1817        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1818        // values when read as nanosecond resolution overflow and result in garbage values.
1819        use arrow_schema::DataType::Timestamp;
1820        let test_data = arrow::util::test_util::parquet_test_data();
1821        let path = format!("{test_data}/int96_from_spark.parquet");
1822        let file = File::open(path).unwrap();
1823
1824        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1825            .unwrap()
1826            .build()
1827            .unwrap();
1828
1829        let batch = record_reader.next().unwrap().unwrap();
1830        assert_eq!(batch.num_columns(), 1);
1831        let column = batch.column(0);
1832        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1833
1834        let expected = Arc::new(Int64Array::from(vec![
1835            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1836            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1837            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1838            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1839            None,
1840            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1841        ]));
1842
1843        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1844        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1845        // anyway, so this should be a zero-copy non-modifying cast.
1846
1847        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1848        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1849
1850        assert_eq!(casted_timestamps.len(), expected.len());
1851
1852        casted_timestamps
1853            .iter()
1854            .zip(expected.iter())
1855            .for_each(|(lhs, rhs)| {
1856                assert_eq!(lhs, rhs);
1857            });
1858    }
1859
1860    struct RandUtf8Gen {}
1861
1862    impl RandGen<ByteArrayType> for RandUtf8Gen {
1863        fn r#gen(len: i32) -> ByteArray {
1864            Int32Type::r#gen(len).to_string().as_str().into()
1865        }
1866    }
1867
1868    #[test]
1869    fn test_utf8_single_column_reader_test() {
1870        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1871            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1872                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1873            })))
1874        }
1875
1876        let encodings = &[
1877            Encoding::PLAIN,
1878            Encoding::RLE_DICTIONARY,
1879            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1880            Encoding::DELTA_BYTE_ARRAY,
1881        ];
1882
1883        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1884            2,
1885            ConvertedType::NONE,
1886            None,
1887            |vals| {
1888                Arc::new(BinaryArray::from_iter(
1889                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1890                ))
1891            },
1892            encodings,
1893        );
1894
1895        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1896            2,
1897            ConvertedType::UTF8,
1898            None,
1899            string_converter::<i32>,
1900            encodings,
1901        );
1902
1903        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1904            2,
1905            ConvertedType::UTF8,
1906            Some(ArrowDataType::Utf8),
1907            string_converter::<i32>,
1908            encodings,
1909        );
1910
1911        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1912            2,
1913            ConvertedType::UTF8,
1914            Some(ArrowDataType::LargeUtf8),
1915            string_converter::<i64>,
1916            encodings,
1917        );
1918
1919        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1920        for key in &small_key_types {
1921            for encoding in encodings {
1922                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1923                opts.encoding = *encoding;
1924
1925                let data_type =
1926                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1927
1928                // Cannot run full test suite as keys overflow, run small test instead
1929                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1930                    opts,
1931                    2,
1932                    ConvertedType::UTF8,
1933                    Some(data_type.clone()),
1934                    move |vals| {
1935                        let vals = string_converter::<i32>(vals);
1936                        arrow::compute::cast(&vals, &data_type).unwrap()
1937                    },
1938                );
1939            }
1940        }
1941
1942        let key_types = [
1943            ArrowDataType::Int16,
1944            ArrowDataType::UInt16,
1945            ArrowDataType::Int32,
1946            ArrowDataType::UInt32,
1947            ArrowDataType::Int64,
1948            ArrowDataType::UInt64,
1949        ];
1950
1951        for key in &key_types {
1952            let data_type =
1953                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1954
1955            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1956                2,
1957                ConvertedType::UTF8,
1958                Some(data_type.clone()),
1959                move |vals| {
1960                    let vals = string_converter::<i32>(vals);
1961                    arrow::compute::cast(&vals, &data_type).unwrap()
1962                },
1963                encodings,
1964            );
1965
1966            // https://github.com/apache/arrow-rs/issues/1179
1967            // let data_type = ArrowDataType::Dictionary(
1968            //     Box::new(key.clone()),
1969            //     Box::new(ArrowDataType::LargeUtf8),
1970            // );
1971            //
1972            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1973            //     2,
1974            //     ConvertedType::UTF8,
1975            //     Some(data_type.clone()),
1976            //     move |vals| {
1977            //         let vals = string_converter::<i64>(vals);
1978            //         arrow::compute::cast(&vals, &data_type).unwrap()
1979            //     },
1980            //     encodings,
1981            // );
1982        }
1983    }
1984
1985    #[test]
1986    fn test_decimal_nullable_struct() {
1987        let decimals = Decimal256Array::from_iter_values(
1988            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1989        );
1990
1991        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1992            "decimals",
1993            decimals.data_type().clone(),
1994            false,
1995        )])))
1996        .len(8)
1997        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1998        .child_data(vec![decimals.into_data()])
1999        .build()
2000        .unwrap();
2001
2002        let written =
2003            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2004                .unwrap();
2005
2006        let mut buffer = Vec::with_capacity(1024);
2007        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2008        writer.write(&written).unwrap();
2009        writer.close().unwrap();
2010
2011        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2012            .unwrap()
2013            .collect::<Result<Vec<_>, _>>()
2014            .unwrap();
2015
2016        assert_eq!(&written.slice(0, 3), &read[0]);
2017        assert_eq!(&written.slice(3, 3), &read[1]);
2018        assert_eq!(&written.slice(6, 2), &read[2]);
2019    }
2020
2021    #[test]
2022    fn test_int32_nullable_struct() {
2023        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2024        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2025            "int32",
2026            int32.data_type().clone(),
2027            false,
2028        )])))
2029        .len(8)
2030        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2031        .child_data(vec![int32.into_data()])
2032        .build()
2033        .unwrap();
2034
2035        let written =
2036            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2037                .unwrap();
2038
2039        let mut buffer = Vec::with_capacity(1024);
2040        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2041        writer.write(&written).unwrap();
2042        writer.close().unwrap();
2043
2044        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2045            .unwrap()
2046            .collect::<Result<Vec<_>, _>>()
2047            .unwrap();
2048
2049        assert_eq!(&written.slice(0, 3), &read[0]);
2050        assert_eq!(&written.slice(3, 3), &read[1]);
2051        assert_eq!(&written.slice(6, 2), &read[2]);
2052    }
2053
2054    #[test]
2055    fn test_decimal_list() {
2056        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2057
2058        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2059        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2060            decimals.data_type().clone(),
2061            false,
2062        ))))
2063        .len(7)
2064        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2065        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2066        .child_data(vec![decimals.into_data()])
2067        .build()
2068        .unwrap();
2069
2070        let written =
2071            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2072                .unwrap();
2073
2074        let mut buffer = Vec::with_capacity(1024);
2075        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2076        writer.write(&written).unwrap();
2077        writer.close().unwrap();
2078
2079        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2080            .unwrap()
2081            .collect::<Result<Vec<_>, _>>()
2082            .unwrap();
2083
2084        assert_eq!(&written.slice(0, 3), &read[0]);
2085        assert_eq!(&written.slice(3, 3), &read[1]);
2086        assert_eq!(&written.slice(6, 1), &read[2]);
2087    }
2088
2089    #[test]
2090    fn test_read_decimal_file() {
2091        use arrow_array::Decimal128Array;
2092        let testdata = arrow::util::test_util::parquet_test_data();
2093        let file_variants = vec![
2094            ("byte_array", 4),
2095            ("fixed_length", 25),
2096            ("int32", 4),
2097            ("int64", 10),
2098        ];
2099        for (prefix, target_precision) in file_variants {
2100            let path = format!("{testdata}/{prefix}_decimal.parquet");
2101            let file = File::open(path).unwrap();
2102            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2103
2104            let batch = record_reader.next().unwrap().unwrap();
2105            assert_eq!(batch.num_rows(), 24);
2106            let col = batch
2107                .column(0)
2108                .as_any()
2109                .downcast_ref::<Decimal128Array>()
2110                .unwrap();
2111
2112            let expected = 1..25;
2113
2114            assert_eq!(col.precision(), target_precision);
2115            assert_eq!(col.scale(), 2);
2116
2117            for (i, v) in expected.enumerate() {
2118                assert_eq!(col.value(i), v * 100_i128);
2119            }
2120        }
2121    }
2122
2123    #[test]
2124    fn test_read_float16_nonzeros_file() {
2125        use arrow_array::Float16Array;
2126        let testdata = arrow::util::test_util::parquet_test_data();
2127        // see https://github.com/apache/parquet-testing/pull/40
2128        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2129        let file = File::open(path).unwrap();
2130        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2131
2132        let batch = record_reader.next().unwrap().unwrap();
2133        assert_eq!(batch.num_rows(), 8);
2134        let col = batch
2135            .column(0)
2136            .as_any()
2137            .downcast_ref::<Float16Array>()
2138            .unwrap();
2139
2140        let f16_two = f16::ONE + f16::ONE;
2141
2142        assert_eq!(col.null_count(), 1);
2143        assert!(col.is_null(0));
2144        assert_eq!(col.value(1), f16::ONE);
2145        assert_eq!(col.value(2), -f16_two);
2146        assert!(col.value(3).is_nan());
2147        assert_eq!(col.value(4), f16::ZERO);
2148        assert!(col.value(4).is_sign_positive());
2149        assert_eq!(col.value(5), f16::NEG_ONE);
2150        assert_eq!(col.value(6), f16::NEG_ZERO);
2151        assert!(col.value(6).is_sign_negative());
2152        assert_eq!(col.value(7), f16_two);
2153    }
2154
2155    #[test]
2156    fn test_read_float16_zeros_file() {
2157        use arrow_array::Float16Array;
2158        let testdata = arrow::util::test_util::parquet_test_data();
2159        // see https://github.com/apache/parquet-testing/pull/40
2160        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2161        let file = File::open(path).unwrap();
2162        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2163
2164        let batch = record_reader.next().unwrap().unwrap();
2165        assert_eq!(batch.num_rows(), 3);
2166        let col = batch
2167            .column(0)
2168            .as_any()
2169            .downcast_ref::<Float16Array>()
2170            .unwrap();
2171
2172        assert_eq!(col.null_count(), 1);
2173        assert!(col.is_null(0));
2174        assert_eq!(col.value(1), f16::ZERO);
2175        assert!(col.value(1).is_sign_positive());
2176        assert!(col.value(2).is_nan());
2177    }
2178
2179    #[test]
2180    fn test_read_float32_float64_byte_stream_split() {
2181        let path = format!(
2182            "{}/byte_stream_split.zstd.parquet",
2183            arrow::util::test_util::parquet_test_data(),
2184        );
2185        let file = File::open(path).unwrap();
2186        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2187
2188        let mut row_count = 0;
2189        for batch in record_reader {
2190            let batch = batch.unwrap();
2191            row_count += batch.num_rows();
2192            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2193            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2194
2195            // This file contains floats from a standard normal distribution
2196            for &x in f32_col.values() {
2197                assert!(x > -10.0);
2198                assert!(x < 10.0);
2199            }
2200            for &x in f64_col.values() {
2201                assert!(x > -10.0);
2202                assert!(x < 10.0);
2203            }
2204        }
2205        assert_eq!(row_count, 300);
2206    }
2207
2208    #[test]
2209    fn test_read_extended_byte_stream_split() {
2210        let path = format!(
2211            "{}/byte_stream_split_extended.gzip.parquet",
2212            arrow::util::test_util::parquet_test_data(),
2213        );
2214        let file = File::open(path).unwrap();
2215        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2216
2217        let mut row_count = 0;
2218        for batch in record_reader {
2219            let batch = batch.unwrap();
2220            row_count += batch.num_rows();
2221
2222            // 0,1 are f16
2223            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2224            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2225            assert_eq!(f16_col.len(), f16_bss.len());
2226            f16_col
2227                .iter()
2228                .zip(f16_bss.iter())
2229                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2230
2231            // 2,3 are f32
2232            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2233            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2234            assert_eq!(f32_col.len(), f32_bss.len());
2235            f32_col
2236                .iter()
2237                .zip(f32_bss.iter())
2238                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2239
2240            // 4,5 are f64
2241            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2242            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2243            assert_eq!(f64_col.len(), f64_bss.len());
2244            f64_col
2245                .iter()
2246                .zip(f64_bss.iter())
2247                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2248
2249            // 6,7 are i32
2250            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2251            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2252            assert_eq!(i32_col.len(), i32_bss.len());
2253            i32_col
2254                .iter()
2255                .zip(i32_bss.iter())
2256                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2257
2258            // 8,9 are i64
2259            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2260            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2261            assert_eq!(i64_col.len(), i64_bss.len());
2262            i64_col
2263                .iter()
2264                .zip(i64_bss.iter())
2265                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2266
2267            // 10,11 are FLBA(5)
2268            let flba_col = batch.column(10).as_fixed_size_binary();
2269            let flba_bss = batch.column(11).as_fixed_size_binary();
2270            assert_eq!(flba_col.len(), flba_bss.len());
2271            flba_col
2272                .iter()
2273                .zip(flba_bss.iter())
2274                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2275
2276            // 12,13 are FLBA(4) (decimal(7,3))
2277            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2278            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2279            assert_eq!(dec_col.len(), dec_bss.len());
2280            dec_col
2281                .iter()
2282                .zip(dec_bss.iter())
2283                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2284        }
2285        assert_eq!(row_count, 200);
2286    }
2287
2288    #[test]
2289    fn test_read_incorrect_map_schema_file() {
2290        let testdata = arrow::util::test_util::parquet_test_data();
2291        // see https://github.com/apache/parquet-testing/pull/47
2292        let path = format!("{testdata}/incorrect_map_schema.parquet");
2293        let file = File::open(path).unwrap();
2294        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2295
2296        let batch = record_reader.next().unwrap().unwrap();
2297        assert_eq!(batch.num_rows(), 1);
2298
2299        let expected_schema = Schema::new(vec![Field::new(
2300            "my_map",
2301            ArrowDataType::Map(
2302                Arc::new(Field::new(
2303                    "key_value",
2304                    ArrowDataType::Struct(Fields::from(vec![
2305                        Field::new("key", ArrowDataType::Utf8, false),
2306                        Field::new("value", ArrowDataType::Utf8, true),
2307                    ])),
2308                    false,
2309                )),
2310                false,
2311            ),
2312            true,
2313        )]);
2314        assert_eq!(batch.schema().as_ref(), &expected_schema);
2315
2316        assert_eq!(batch.num_rows(), 1);
2317        assert_eq!(batch.column(0).null_count(), 0);
2318        assert_eq!(
2319            batch.column(0).as_map().keys().as_ref(),
2320            &StringArray::from(vec!["parent", "name"])
2321        );
2322        assert_eq!(
2323            batch.column(0).as_map().values().as_ref(),
2324            &StringArray::from(vec!["another", "report"])
2325        );
2326    }
2327
2328    #[test]
2329    fn test_read_dict_fixed_size_binary() {
2330        let schema = Arc::new(Schema::new(vec![Field::new(
2331            "a",
2332            ArrowDataType::Dictionary(
2333                Box::new(ArrowDataType::UInt8),
2334                Box::new(ArrowDataType::FixedSizeBinary(8)),
2335            ),
2336            true,
2337        )]));
2338        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2339        let values = FixedSizeBinaryArray::try_from_iter(
2340            vec![
2341                (0u8..8u8).collect::<Vec<u8>>(),
2342                (24u8..32u8).collect::<Vec<u8>>(),
2343            ]
2344            .into_iter(),
2345        )
2346        .unwrap();
2347        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2348        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2349
2350        let mut buffer = Vec::with_capacity(1024);
2351        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2352        writer.write(&batch).unwrap();
2353        writer.close().unwrap();
2354        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2355            .unwrap()
2356            .collect::<Result<Vec<_>, _>>()
2357            .unwrap();
2358
2359        assert_eq!(read.len(), 1);
2360        assert_eq!(&batch, &read[0])
2361    }
2362
2363    /// Parameters for single_column_reader_test
2364    #[derive(Clone)]
2365    struct TestOptions {
2366        /// Number of row group to write to parquet (row group size =
2367        /// num_row_groups / num_rows)
2368        num_row_groups: usize,
2369        /// Total number of rows per row group
2370        num_rows: usize,
2371        /// Size of batches to read back
2372        record_batch_size: usize,
2373        /// Percentage of nulls in column or None if required
2374        null_percent: Option<usize>,
2375        /// Set write batch size
2376        ///
2377        /// This is the number of rows that are written at once to a page and
2378        /// therefore acts as a bound on the page granularity of a row group
2379        write_batch_size: usize,
2380        /// Maximum size of page in bytes
2381        max_data_page_size: usize,
2382        /// Maximum size of dictionary page in bytes
2383        max_dict_page_size: usize,
2384        /// Writer version
2385        writer_version: WriterVersion,
2386        /// Enabled statistics
2387        enabled_statistics: EnabledStatistics,
2388        /// Encoding
2389        encoding: Encoding,
2390        /// row selections and total selected row count
2391        row_selections: Option<(RowSelection, usize)>,
2392        /// row filter
2393        row_filter: Option<Vec<bool>>,
2394        /// limit
2395        limit: Option<usize>,
2396        /// offset
2397        offset: Option<usize>,
2398    }
2399
2400    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2401    impl std::fmt::Debug for TestOptions {
2402        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2403            f.debug_struct("TestOptions")
2404                .field("num_row_groups", &self.num_row_groups)
2405                .field("num_rows", &self.num_rows)
2406                .field("record_batch_size", &self.record_batch_size)
2407                .field("null_percent", &self.null_percent)
2408                .field("write_batch_size", &self.write_batch_size)
2409                .field("max_data_page_size", &self.max_data_page_size)
2410                .field("max_dict_page_size", &self.max_dict_page_size)
2411                .field("writer_version", &self.writer_version)
2412                .field("enabled_statistics", &self.enabled_statistics)
2413                .field("encoding", &self.encoding)
2414                .field("row_selections", &self.row_selections.is_some())
2415                .field("row_filter", &self.row_filter.is_some())
2416                .field("limit", &self.limit)
2417                .field("offset", &self.offset)
2418                .finish()
2419        }
2420    }
2421
2422    impl Default for TestOptions {
2423        fn default() -> Self {
2424            Self {
2425                num_row_groups: 2,
2426                num_rows: 100,
2427                record_batch_size: 15,
2428                null_percent: None,
2429                write_batch_size: 64,
2430                max_data_page_size: 1024 * 1024,
2431                max_dict_page_size: 1024 * 1024,
2432                writer_version: WriterVersion::PARQUET_1_0,
2433                enabled_statistics: EnabledStatistics::Page,
2434                encoding: Encoding::PLAIN,
2435                row_selections: None,
2436                row_filter: None,
2437                limit: None,
2438                offset: None,
2439            }
2440        }
2441    }
2442
2443    impl TestOptions {
2444        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2445            Self {
2446                num_row_groups,
2447                num_rows,
2448                record_batch_size,
2449                ..Default::default()
2450            }
2451        }
2452
2453        fn with_null_percent(self, null_percent: usize) -> Self {
2454            Self {
2455                null_percent: Some(null_percent),
2456                ..self
2457            }
2458        }
2459
2460        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2461            Self {
2462                max_data_page_size,
2463                ..self
2464            }
2465        }
2466
2467        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2468            Self {
2469                max_dict_page_size,
2470                ..self
2471            }
2472        }
2473
2474        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2475            Self {
2476                enabled_statistics,
2477                ..self
2478            }
2479        }
2480
2481        fn with_row_selections(self) -> Self {
2482            assert!(self.row_filter.is_none(), "Must set row selection first");
2483
2484            let mut rng = rng();
2485            let step = rng.random_range(self.record_batch_size..self.num_rows);
2486            let row_selections = create_test_selection(
2487                step,
2488                self.num_row_groups * self.num_rows,
2489                rng.random::<bool>(),
2490            );
2491            Self {
2492                row_selections: Some(row_selections),
2493                ..self
2494            }
2495        }
2496
2497        fn with_row_filter(self) -> Self {
2498            let row_count = match &self.row_selections {
2499                Some((_, count)) => *count,
2500                None => self.num_row_groups * self.num_rows,
2501            };
2502
2503            let mut rng = rng();
2504            Self {
2505                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2506                ..self
2507            }
2508        }
2509
2510        fn with_limit(self, limit: usize) -> Self {
2511            Self {
2512                limit: Some(limit),
2513                ..self
2514            }
2515        }
2516
2517        fn with_offset(self, offset: usize) -> Self {
2518            Self {
2519                offset: Some(offset),
2520                ..self
2521            }
2522        }
2523
2524        fn writer_props(&self) -> WriterProperties {
2525            let builder = WriterProperties::builder()
2526                .set_data_page_size_limit(self.max_data_page_size)
2527                .set_write_batch_size(self.write_batch_size)
2528                .set_writer_version(self.writer_version)
2529                .set_statistics_enabled(self.enabled_statistics);
2530
2531            let builder = match self.encoding {
2532                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2533                    .set_dictionary_enabled(true)
2534                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2535                _ => builder
2536                    .set_dictionary_enabled(false)
2537                    .set_encoding(self.encoding),
2538            };
2539
2540            builder.build()
2541        }
2542    }
2543
2544    /// Create a parquet file and then read it using
2545    /// `ParquetFileArrowReader` using a standard set of parameters
2546    /// `opts`.
2547    ///
2548    /// `rand_max` represents the maximum size of value to pass to to
2549    /// value generator
2550    fn run_single_column_reader_tests<T, F, G>(
2551        rand_max: i32,
2552        converted_type: ConvertedType,
2553        arrow_type: Option<ArrowDataType>,
2554        converter: F,
2555        encodings: &[Encoding],
2556    ) where
2557        T: DataType,
2558        G: RandGen<T>,
2559        F: Fn(&[Option<T::T>]) -> ArrayRef,
2560    {
2561        let all_options = vec![
2562            // choose record_batch_batch (15) so batches cross row
2563            // group boundaries (50 rows in 2 row groups) cases.
2564            TestOptions::new(2, 100, 15),
2565            // choose record_batch_batch (5) so batches sometime fall
2566            // on row group boundaries and (25 rows in 3 row groups
2567            // --> row groups of 10, 10, and 5). Tests buffer
2568            // refilling edge cases.
2569            TestOptions::new(3, 25, 5),
2570            // Choose record_batch_size (25) so all batches fall
2571            // exactly on row group boundary (25). Tests buffer
2572            // refilling edge cases.
2573            TestOptions::new(4, 100, 25),
2574            // Set maximum page size so row groups have multiple pages
2575            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2576            // Set small dictionary page size to test dictionary fallback
2577            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2578            // Test optional but with no nulls
2579            TestOptions::new(2, 256, 127).with_null_percent(0),
2580            // Test optional with nulls
2581            TestOptions::new(2, 256, 93).with_null_percent(25),
2582            // Test with limit of 0
2583            TestOptions::new(4, 100, 25).with_limit(0),
2584            // Test with limit of 50
2585            TestOptions::new(4, 100, 25).with_limit(50),
2586            // Test with limit equal to number of rows
2587            TestOptions::new(4, 100, 25).with_limit(10),
2588            // Test with limit larger than number of rows
2589            TestOptions::new(4, 100, 25).with_limit(101),
2590            // Test with limit + offset equal to number of rows
2591            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2592            // Test with limit + offset equal to number of rows
2593            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2594            // Test with limit + offset larger than number of rows
2595            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2596            // Test with no page-level statistics
2597            TestOptions::new(2, 256, 91)
2598                .with_null_percent(25)
2599                .with_enabled_statistics(EnabledStatistics::Chunk),
2600            // Test with no statistics
2601            TestOptions::new(2, 256, 91)
2602                .with_null_percent(25)
2603                .with_enabled_statistics(EnabledStatistics::None),
2604            // Test with all null
2605            TestOptions::new(2, 128, 91)
2606                .with_null_percent(100)
2607                .with_enabled_statistics(EnabledStatistics::None),
2608            // Test skip
2609
2610            // choose record_batch_batch (15) so batches cross row
2611            // group boundaries (50 rows in 2 row groups) cases.
2612            TestOptions::new(2, 100, 15).with_row_selections(),
2613            // choose record_batch_batch (5) so batches sometime fall
2614            // on row group boundaries and (25 rows in 3 row groups
2615            // --> row groups of 10, 10, and 5). Tests buffer
2616            // refilling edge cases.
2617            TestOptions::new(3, 25, 5).with_row_selections(),
2618            // Choose record_batch_size (25) so all batches fall
2619            // exactly on row group boundary (25). Tests buffer
2620            // refilling edge cases.
2621            TestOptions::new(4, 100, 25).with_row_selections(),
2622            // Set maximum page size so row groups have multiple pages
2623            TestOptions::new(3, 256, 73)
2624                .with_max_data_page_size(128)
2625                .with_row_selections(),
2626            // Set small dictionary page size to test dictionary fallback
2627            TestOptions::new(3, 256, 57)
2628                .with_max_dict_page_size(128)
2629                .with_row_selections(),
2630            // Test optional but with no nulls
2631            TestOptions::new(2, 256, 127)
2632                .with_null_percent(0)
2633                .with_row_selections(),
2634            // Test optional with nulls
2635            TestOptions::new(2, 256, 93)
2636                .with_null_percent(25)
2637                .with_row_selections(),
2638            // Test optional with nulls
2639            TestOptions::new(2, 256, 93)
2640                .with_null_percent(25)
2641                .with_row_selections()
2642                .with_limit(10),
2643            // Test optional with nulls
2644            TestOptions::new(2, 256, 93)
2645                .with_null_percent(25)
2646                .with_row_selections()
2647                .with_offset(20)
2648                .with_limit(10),
2649            // Test filter
2650
2651            // Test with row filter
2652            TestOptions::new(4, 100, 25).with_row_filter(),
2653            // Test with row selection and row filter
2654            TestOptions::new(4, 100, 25)
2655                .with_row_selections()
2656                .with_row_filter(),
2657            // Test with nulls and row filter
2658            TestOptions::new(2, 256, 93)
2659                .with_null_percent(25)
2660                .with_max_data_page_size(10)
2661                .with_row_filter(),
2662            // Test with nulls and row filter and small pages
2663            TestOptions::new(2, 256, 93)
2664                .with_null_percent(25)
2665                .with_max_data_page_size(10)
2666                .with_row_selections()
2667                .with_row_filter(),
2668            // Test with row selection and no offset index and small pages
2669            TestOptions::new(2, 256, 93)
2670                .with_enabled_statistics(EnabledStatistics::None)
2671                .with_max_data_page_size(10)
2672                .with_row_selections(),
2673        ];
2674
2675        all_options.into_iter().for_each(|opts| {
2676            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2677                for encoding in encodings {
2678                    let opts = TestOptions {
2679                        writer_version,
2680                        encoding: *encoding,
2681                        ..opts.clone()
2682                    };
2683
2684                    single_column_reader_test::<T, _, G>(
2685                        opts,
2686                        rand_max,
2687                        converted_type,
2688                        arrow_type.clone(),
2689                        &converter,
2690                    )
2691                }
2692            }
2693        });
2694    }
2695
2696    /// Create a parquet file and then read it using
2697    /// `ParquetFileArrowReader` using the parameters described in
2698    /// `opts`.
2699    fn single_column_reader_test<T, F, G>(
2700        opts: TestOptions,
2701        rand_max: i32,
2702        converted_type: ConvertedType,
2703        arrow_type: Option<ArrowDataType>,
2704        converter: F,
2705    ) where
2706        T: DataType,
2707        G: RandGen<T>,
2708        F: Fn(&[Option<T::T>]) -> ArrayRef,
2709    {
2710        // Print out options to facilitate debugging failures on CI
2711        println!(
2712            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2713            T::get_physical_type(),
2714            converted_type,
2715            arrow_type,
2716            opts
2717        );
2718
2719        //according to null_percent generate def_levels
2720        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2721            Some(null_percent) => {
2722                let mut rng = rng();
2723
2724                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2725                    .map(|_| {
2726                        std::iter::from_fn(|| {
2727                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2728                        })
2729                        .take(opts.num_rows)
2730                        .collect()
2731                    })
2732                    .collect();
2733                (Repetition::OPTIONAL, Some(def_levels))
2734            }
2735            None => (Repetition::REQUIRED, None),
2736        };
2737
2738        //generate random table data
2739        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2740            .map(|idx| {
2741                let null_count = match def_levels.as_ref() {
2742                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2743                    None => 0,
2744                };
2745                G::gen_vec(rand_max, opts.num_rows - null_count)
2746            })
2747            .collect();
2748
2749        let len = match T::get_physical_type() {
2750            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2751            crate::basic::Type::INT96 => 12,
2752            _ => -1,
2753        };
2754
2755        let fields = vec![Arc::new(
2756            Type::primitive_type_builder("leaf", T::get_physical_type())
2757                .with_repetition(repetition)
2758                .with_converted_type(converted_type)
2759                .with_length(len)
2760                .build()
2761                .unwrap(),
2762        )];
2763
2764        let schema = Arc::new(
2765            Type::group_type_builder("test_schema")
2766                .with_fields(fields)
2767                .build()
2768                .unwrap(),
2769        );
2770
2771        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2772
2773        let mut file = tempfile::tempfile().unwrap();
2774
2775        generate_single_column_file_with_data::<T>(
2776            &values,
2777            def_levels.as_ref(),
2778            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2779            schema,
2780            arrow_field,
2781            &opts,
2782        )
2783        .unwrap();
2784
2785        file.rewind().unwrap();
2786
2787        let options = ArrowReaderOptions::new()
2788            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2789
2790        let mut builder =
2791            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2792
2793        let expected_data = match opts.row_selections {
2794            Some((selections, row_count)) => {
2795                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2796
2797                let mut skip_data: Vec<Option<T::T>> = vec![];
2798                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2799                for select in dequeue {
2800                    if select.skip {
2801                        without_skip_data.drain(0..select.row_count);
2802                    } else {
2803                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2804                    }
2805                }
2806                builder = builder.with_row_selection(selections);
2807
2808                assert_eq!(skip_data.len(), row_count);
2809                skip_data
2810            }
2811            None => {
2812                //get flatten table data
2813                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2814                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2815                expected_data
2816            }
2817        };
2818
2819        let mut expected_data = match opts.row_filter {
2820            Some(filter) => {
2821                let expected_data = expected_data
2822                    .into_iter()
2823                    .zip(filter.iter())
2824                    .filter_map(|(d, f)| f.then(|| d))
2825                    .collect();
2826
2827                let mut filter_offset = 0;
2828                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2829                    ProjectionMask::all(),
2830                    move |b| {
2831                        let array = BooleanArray::from_iter(
2832                            filter
2833                                .iter()
2834                                .skip(filter_offset)
2835                                .take(b.num_rows())
2836                                .map(|x| Some(*x)),
2837                        );
2838                        filter_offset += b.num_rows();
2839                        Ok(array)
2840                    },
2841                ))]);
2842
2843                builder = builder.with_row_filter(filter);
2844                expected_data
2845            }
2846            None => expected_data,
2847        };
2848
2849        if let Some(offset) = opts.offset {
2850            builder = builder.with_offset(offset);
2851            expected_data = expected_data.into_iter().skip(offset).collect();
2852        }
2853
2854        if let Some(limit) = opts.limit {
2855            builder = builder.with_limit(limit);
2856            expected_data = expected_data.into_iter().take(limit).collect();
2857        }
2858
2859        let mut record_reader = builder
2860            .with_batch_size(opts.record_batch_size)
2861            .build()
2862            .unwrap();
2863
2864        let mut total_read = 0;
2865        loop {
2866            let maybe_batch = record_reader.next();
2867            if total_read < expected_data.len() {
2868                let end = min(total_read + opts.record_batch_size, expected_data.len());
2869                let batch = maybe_batch.unwrap().unwrap();
2870                assert_eq!(end - total_read, batch.num_rows());
2871
2872                let a = converter(&expected_data[total_read..end]);
2873                let b = Arc::clone(batch.column(0));
2874
2875                assert_eq!(a.data_type(), b.data_type());
2876                assert_eq!(a.to_data(), b.to_data());
2877                assert_eq!(
2878                    a.as_any().type_id(),
2879                    b.as_any().type_id(),
2880                    "incorrect type ids"
2881                );
2882
2883                total_read = end;
2884            } else {
2885                assert!(maybe_batch.is_none());
2886                break;
2887            }
2888        }
2889    }
2890
2891    fn gen_expected_data<T: DataType>(
2892        def_levels: Option<&Vec<Vec<i16>>>,
2893        values: &[Vec<T::T>],
2894    ) -> Vec<Option<T::T>> {
2895        let data: Vec<Option<T::T>> = match def_levels {
2896            Some(levels) => {
2897                let mut values_iter = values.iter().flatten();
2898                levels
2899                    .iter()
2900                    .flatten()
2901                    .map(|d| match d {
2902                        1 => Some(values_iter.next().cloned().unwrap()),
2903                        0 => None,
2904                        _ => unreachable!(),
2905                    })
2906                    .collect()
2907            }
2908            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2909        };
2910        data
2911    }
2912
2913    fn generate_single_column_file_with_data<T: DataType>(
2914        values: &[Vec<T::T>],
2915        def_levels: Option<&Vec<Vec<i16>>>,
2916        file: File,
2917        schema: TypePtr,
2918        field: Option<Field>,
2919        opts: &TestOptions,
2920    ) -> Result<ParquetMetaData> {
2921        let mut writer_props = opts.writer_props();
2922        if let Some(field) = field {
2923            let arrow_schema = Schema::new(vec![field]);
2924            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2925        }
2926
2927        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2928
2929        for (idx, v) in values.iter().enumerate() {
2930            let def_levels = def_levels.map(|d| d[idx].as_slice());
2931            let mut row_group_writer = writer.next_row_group()?;
2932            {
2933                let mut column_writer = row_group_writer
2934                    .next_column()?
2935                    .expect("Column writer is none!");
2936
2937                column_writer
2938                    .typed::<T>()
2939                    .write_batch(v, def_levels, None)?;
2940
2941                column_writer.close()?;
2942            }
2943            row_group_writer.close()?;
2944        }
2945
2946        writer.close()
2947    }
2948
2949    fn get_test_file(file_name: &str) -> File {
2950        let mut path = PathBuf::new();
2951        path.push(arrow::util::test_util::arrow_test_data());
2952        path.push(file_name);
2953
2954        File::open(path.as_path()).expect("File not found!")
2955    }
2956
2957    #[test]
2958    fn test_read_structs() {
2959        // This particular test file has columns of struct types where there is
2960        // a column that has the same name as one of the struct fields
2961        // (see: ARROW-11452)
2962        let testdata = arrow::util::test_util::parquet_test_data();
2963        let path = format!("{testdata}/nested_structs.rust.parquet");
2964        let file = File::open(&path).unwrap();
2965        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2966
2967        for batch in record_batch_reader {
2968            batch.unwrap();
2969        }
2970
2971        let file = File::open(&path).unwrap();
2972        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2973
2974        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2975        let projected_reader = builder
2976            .with_projection(mask)
2977            .with_batch_size(60)
2978            .build()
2979            .unwrap();
2980
2981        let expected_schema = Schema::new(vec![
2982            Field::new(
2983                "roll_num",
2984                ArrowDataType::Struct(Fields::from(vec![Field::new(
2985                    "count",
2986                    ArrowDataType::UInt64,
2987                    false,
2988                )])),
2989                false,
2990            ),
2991            Field::new(
2992                "PC_CUR",
2993                ArrowDataType::Struct(Fields::from(vec![
2994                    Field::new("mean", ArrowDataType::Int64, false),
2995                    Field::new("sum", ArrowDataType::Int64, false),
2996                ])),
2997                false,
2998            ),
2999        ]);
3000
3001        // Tests for #1652 and #1654
3002        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3003
3004        for batch in projected_reader {
3005            let batch = batch.unwrap();
3006            assert_eq!(batch.schema().as_ref(), &expected_schema);
3007        }
3008    }
3009
3010    #[test]
3011    // same as test_read_structs but constructs projection mask via column names
3012    fn test_read_structs_by_name() {
3013        let testdata = arrow::util::test_util::parquet_test_data();
3014        let path = format!("{testdata}/nested_structs.rust.parquet");
3015        let file = File::open(&path).unwrap();
3016        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3017
3018        for batch in record_batch_reader {
3019            batch.unwrap();
3020        }
3021
3022        let file = File::open(&path).unwrap();
3023        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3024
3025        let mask = ProjectionMask::columns(
3026            builder.parquet_schema(),
3027            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3028        );
3029        let projected_reader = builder
3030            .with_projection(mask)
3031            .with_batch_size(60)
3032            .build()
3033            .unwrap();
3034
3035        let expected_schema = Schema::new(vec![
3036            Field::new(
3037                "roll_num",
3038                ArrowDataType::Struct(Fields::from(vec![Field::new(
3039                    "count",
3040                    ArrowDataType::UInt64,
3041                    false,
3042                )])),
3043                false,
3044            ),
3045            Field::new(
3046                "PC_CUR",
3047                ArrowDataType::Struct(Fields::from(vec![
3048                    Field::new("mean", ArrowDataType::Int64, false),
3049                    Field::new("sum", ArrowDataType::Int64, false),
3050                ])),
3051                false,
3052            ),
3053        ]);
3054
3055        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3056
3057        for batch in projected_reader {
3058            let batch = batch.unwrap();
3059            assert_eq!(batch.schema().as_ref(), &expected_schema);
3060        }
3061    }
3062
3063    #[test]
3064    fn test_read_maps() {
3065        let testdata = arrow::util::test_util::parquet_test_data();
3066        let path = format!("{testdata}/nested_maps.snappy.parquet");
3067        let file = File::open(path).unwrap();
3068        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3069
3070        for batch in record_batch_reader {
3071            batch.unwrap();
3072        }
3073    }
3074
3075    #[test]
3076    fn test_nested_nullability() {
3077        let message_type = "message nested {
3078          OPTIONAL Group group {
3079            REQUIRED INT32 leaf;
3080          }
3081        }";
3082
3083        let file = tempfile::tempfile().unwrap();
3084        let schema = Arc::new(parse_message_type(message_type).unwrap());
3085
3086        {
3087            // Write using low-level parquet API (#1167)
3088            let mut writer =
3089                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3090                    .unwrap();
3091
3092            {
3093                let mut row_group_writer = writer.next_row_group().unwrap();
3094                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3095
3096                column_writer
3097                    .typed::<Int32Type>()
3098                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3099                    .unwrap();
3100
3101                column_writer.close().unwrap();
3102                row_group_writer.close().unwrap();
3103            }
3104
3105            writer.close().unwrap();
3106        }
3107
3108        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3109        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3110
3111        let reader = builder.with_projection(mask).build().unwrap();
3112
3113        let expected_schema = Schema::new(vec![Field::new(
3114            "group",
3115            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3116            true,
3117        )]);
3118
3119        let batch = reader.into_iter().next().unwrap().unwrap();
3120        assert_eq!(batch.schema().as_ref(), &expected_schema);
3121        assert_eq!(batch.num_rows(), 4);
3122        assert_eq!(batch.column(0).null_count(), 2);
3123    }
3124
3125    #[test]
3126    fn test_invalid_utf8() {
3127        // a parquet file with 1 column with invalid utf8
3128        let data = vec![
3129            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3130            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3131            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3132            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3133            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3134            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3135            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3136            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3137            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3138            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3139            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3140            82, 49,
3141        ];
3142
3143        let file = Bytes::from(data);
3144        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3145
3146        let error = record_batch_reader.next().unwrap().unwrap_err();
3147
3148        assert!(
3149            error.to_string().contains("invalid utf-8 sequence"),
3150            "{}",
3151            error
3152        );
3153    }
3154
3155    #[test]
3156    fn test_invalid_utf8_string_array() {
3157        test_invalid_utf8_string_array_inner::<i32>();
3158    }
3159
3160    #[test]
3161    fn test_invalid_utf8_large_string_array() {
3162        test_invalid_utf8_string_array_inner::<i64>();
3163    }
3164
3165    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3166        let cases = [
3167            invalid_utf8_first_char::<O>(),
3168            invalid_utf8_first_char_long_strings::<O>(),
3169            invalid_utf8_later_char::<O>(),
3170            invalid_utf8_later_char_long_strings::<O>(),
3171            invalid_utf8_later_char_really_long_strings::<O>(),
3172            invalid_utf8_later_char_really_long_strings2::<O>(),
3173        ];
3174        for array in &cases {
3175            for encoding in STRING_ENCODINGS {
3176                // data is not valid utf8 we can not construct a correct StringArray
3177                // safely, so purposely create an invalid StringArray
3178                let array = unsafe {
3179                    GenericStringArray::<O>::new_unchecked(
3180                        array.offsets().clone(),
3181                        array.values().clone(),
3182                        array.nulls().cloned(),
3183                    )
3184                };
3185                let data_type = array.data_type().clone();
3186                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3187                let err = read_from_parquet(data).unwrap_err();
3188                let expected_err =
3189                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3190                assert!(
3191                    err.to_string().contains(expected_err),
3192                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3193                );
3194            }
3195        }
3196    }
3197
3198    #[test]
3199    fn test_invalid_utf8_string_view_array() {
3200        let cases = [
3201            invalid_utf8_first_char::<i32>(),
3202            invalid_utf8_first_char_long_strings::<i32>(),
3203            invalid_utf8_later_char::<i32>(),
3204            invalid_utf8_later_char_long_strings::<i32>(),
3205            invalid_utf8_later_char_really_long_strings::<i32>(),
3206            invalid_utf8_later_char_really_long_strings2::<i32>(),
3207        ];
3208
3209        for encoding in STRING_ENCODINGS {
3210            for array in &cases {
3211                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3212                let array = array.as_binary_view();
3213
3214                // data is not valid utf8 we can not construct a correct StringArray
3215                // safely, so purposely create an invalid StringViewArray
3216                let array = unsafe {
3217                    StringViewArray::new_unchecked(
3218                        array.views().clone(),
3219                        array.data_buffers().to_vec(),
3220                        array.nulls().cloned(),
3221                    )
3222                };
3223
3224                let data_type = array.data_type().clone();
3225                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3226                let err = read_from_parquet(data).unwrap_err();
3227                let expected_err =
3228                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3229                assert!(
3230                    err.to_string().contains(expected_err),
3231                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3232                );
3233            }
3234        }
3235    }
3236
3237    /// Encodings suitable for string data
3238    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3239        None,
3240        Some(Encoding::PLAIN),
3241        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3242        Some(Encoding::DELTA_BYTE_ARRAY),
3243    ];
3244
3245    /// Invalid Utf-8 sequence in the first character
3246    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3247    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3248
3249    /// Invalid Utf=8 sequence in NOT the first character
3250    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3251    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3252
3253    /// returns a BinaryArray with invalid UTF8 data in the first character
3254    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3255        let valid: &[u8] = b"   ";
3256        let invalid = INVALID_UTF8_FIRST_CHAR;
3257        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3258    }
3259
3260    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3261    /// string larger than 12 bytes which is handled specially when reading
3262    /// `ByteViewArray`s
3263    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3264        let valid: &[u8] = b"   ";
3265        let mut invalid = vec![];
3266        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3267        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3268        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3269    }
3270
3271    /// returns a BinaryArray with invalid UTF8 data in a character other than
3272    /// the first (this is checked in a special codepath)
3273    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3274        let valid: &[u8] = b"   ";
3275        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3276        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3277    }
3278
3279    /// returns a BinaryArray with invalid UTF8 data in a character other than
3280    /// the first in a string larger than 12 bytes which is handled specially
3281    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3282    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3283        let valid: &[u8] = b"   ";
3284        let mut invalid = vec![];
3285        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3286        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3287        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3288    }
3289
3290    /// returns a BinaryArray with invalid UTF8 data in a character other than
3291    /// the first in a string larger than 128 bytes which is handled specially
3292    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3293    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3294        let valid: &[u8] = b"   ";
3295        let mut invalid = vec![];
3296        for _ in 0..10 {
3297            // each instance is 38 bytes
3298            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3299        }
3300        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3301        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3302    }
3303
3304    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3305    /// invalid UTF8 data in a character other than the first in a string larger
3306    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3307        let valid: &[u8] = b"   ";
3308        let mut valid_long = vec![];
3309        for _ in 0..10 {
3310            // each instance is 38 bytes
3311            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3312        }
3313        let invalid = INVALID_UTF8_LATER_CHAR;
3314        GenericBinaryArray::<O>::from_iter(vec![
3315            None,
3316            Some(valid),
3317            Some(invalid),
3318            None,
3319            Some(&valid_long),
3320            Some(valid),
3321        ])
3322    }
3323
3324    /// writes the array into a single column parquet file with the specified
3325    /// encoding.
3326    ///
3327    /// If no encoding is specified, use default (dictionary) encoding
3328    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3329        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3330        let mut data = vec![];
3331        let schema = batch.schema();
3332        let props = encoding.map(|encoding| {
3333            WriterProperties::builder()
3334                // must disable dictionary encoding to actually use encoding
3335                .set_dictionary_enabled(false)
3336                .set_encoding(encoding)
3337                .build()
3338        });
3339
3340        {
3341            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3342            writer.write(&batch).unwrap();
3343            writer.flush().unwrap();
3344            writer.close().unwrap();
3345        };
3346        data
3347    }
3348
3349    /// read the parquet file into a record batch
3350    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3351        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3352            .unwrap()
3353            .build()
3354            .unwrap();
3355
3356        reader.collect()
3357    }
3358
3359    #[test]
3360    fn test_dictionary_preservation() {
3361        let fields = vec![Arc::new(
3362            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3363                .with_repetition(Repetition::OPTIONAL)
3364                .with_converted_type(ConvertedType::UTF8)
3365                .build()
3366                .unwrap(),
3367        )];
3368
3369        let schema = Arc::new(
3370            Type::group_type_builder("test_schema")
3371                .with_fields(fields)
3372                .build()
3373                .unwrap(),
3374        );
3375
3376        let dict_type = ArrowDataType::Dictionary(
3377            Box::new(ArrowDataType::Int32),
3378            Box::new(ArrowDataType::Utf8),
3379        );
3380
3381        let arrow_field = Field::new("leaf", dict_type, true);
3382
3383        let mut file = tempfile::tempfile().unwrap();
3384
3385        let values = vec![
3386            vec![
3387                ByteArray::from("hello"),
3388                ByteArray::from("a"),
3389                ByteArray::from("b"),
3390                ByteArray::from("d"),
3391            ],
3392            vec![
3393                ByteArray::from("c"),
3394                ByteArray::from("a"),
3395                ByteArray::from("b"),
3396            ],
3397        ];
3398
3399        let def_levels = vec![
3400            vec![1, 0, 0, 1, 0, 0, 1, 1],
3401            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3402        ];
3403
3404        let opts = TestOptions {
3405            encoding: Encoding::RLE_DICTIONARY,
3406            ..Default::default()
3407        };
3408
3409        generate_single_column_file_with_data::<ByteArrayType>(
3410            &values,
3411            Some(&def_levels),
3412            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3413            schema,
3414            Some(arrow_field),
3415            &opts,
3416        )
3417        .unwrap();
3418
3419        file.rewind().unwrap();
3420
3421        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3422
3423        let batches = record_reader
3424            .collect::<Result<Vec<RecordBatch>, _>>()
3425            .unwrap();
3426
3427        assert_eq!(batches.len(), 6);
3428        assert!(batches.iter().all(|x| x.num_columns() == 1));
3429
3430        let row_counts = batches
3431            .iter()
3432            .map(|x| (x.num_rows(), x.column(0).null_count()))
3433            .collect::<Vec<_>>();
3434
3435        assert_eq!(
3436            row_counts,
3437            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3438        );
3439
3440        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3441
3442        // First and second batch in same row group -> same dictionary
3443        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3444        // Third batch spans row group -> computed dictionary
3445        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3446        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3447        // Fourth, fifth and sixth from same row group -> same dictionary
3448        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3449        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3450    }
3451
3452    #[test]
3453    fn test_read_null_list() {
3454        let testdata = arrow::util::test_util::parquet_test_data();
3455        let path = format!("{testdata}/null_list.parquet");
3456        let file = File::open(path).unwrap();
3457        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3458
3459        let batch = record_batch_reader.next().unwrap().unwrap();
3460        assert_eq!(batch.num_rows(), 1);
3461        assert_eq!(batch.num_columns(), 1);
3462        assert_eq!(batch.column(0).len(), 1);
3463
3464        let list = batch
3465            .column(0)
3466            .as_any()
3467            .downcast_ref::<ListArray>()
3468            .unwrap();
3469        assert_eq!(list.len(), 1);
3470        assert!(list.is_valid(0));
3471
3472        let val = list.value(0);
3473        assert_eq!(val.len(), 0);
3474    }
3475
3476    #[test]
3477    fn test_null_schema_inference() {
3478        let testdata = arrow::util::test_util::parquet_test_data();
3479        let path = format!("{testdata}/null_list.parquet");
3480        let file = File::open(path).unwrap();
3481
3482        let arrow_field = Field::new(
3483            "emptylist",
3484            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3485            true,
3486        );
3487
3488        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3489        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3490        let schema = builder.schema();
3491        assert_eq!(schema.fields().len(), 1);
3492        assert_eq!(schema.field(0), &arrow_field);
3493    }
3494
3495    #[test]
3496    fn test_skip_metadata() {
3497        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3498        let field = Field::new("col", col.data_type().clone(), true);
3499
3500        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3501
3502        let metadata = [("key".to_string(), "value".to_string())]
3503            .into_iter()
3504            .collect();
3505
3506        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3507
3508        assert_ne!(schema_with_metadata, schema_without_metadata);
3509
3510        let batch =
3511            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3512
3513        let file = |version: WriterVersion| {
3514            let props = WriterProperties::builder()
3515                .set_writer_version(version)
3516                .build();
3517
3518            let file = tempfile().unwrap();
3519            let mut writer =
3520                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3521                    .unwrap();
3522            writer.write(&batch).unwrap();
3523            writer.close().unwrap();
3524            file
3525        };
3526
3527        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3528
3529        let v1_reader = file(WriterVersion::PARQUET_1_0);
3530        let v2_reader = file(WriterVersion::PARQUET_2_0);
3531
3532        let arrow_reader =
3533            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3534        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3535
3536        let reader =
3537            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3538                .unwrap()
3539                .build()
3540                .unwrap();
3541        assert_eq!(reader.schema(), schema_without_metadata);
3542
3543        let arrow_reader =
3544            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3545        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3546
3547        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3548            .unwrap()
3549            .build()
3550            .unwrap();
3551        assert_eq!(reader.schema(), schema_without_metadata);
3552    }
3553
3554    fn write_parquet_from_iter<I, F>(value: I) -> File
3555    where
3556        I: IntoIterator<Item = (F, ArrayRef)>,
3557        F: AsRef<str>,
3558    {
3559        let batch = RecordBatch::try_from_iter(value).unwrap();
3560        let file = tempfile().unwrap();
3561        let mut writer =
3562            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3563        writer.write(&batch).unwrap();
3564        writer.close().unwrap();
3565        file
3566    }
3567
3568    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3569    where
3570        I: IntoIterator<Item = (F, ArrayRef)>,
3571        F: AsRef<str>,
3572    {
3573        let file = write_parquet_from_iter(value);
3574        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3575        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3576            file.try_clone().unwrap(),
3577            options_with_schema,
3578        );
3579        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3580    }
3581
3582    #[test]
3583    fn test_schema_too_few_columns() {
3584        run_schema_test_with_error(
3585            vec![
3586                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3587                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3588            ],
3589            Arc::new(Schema::new(vec![Field::new(
3590                "int64",
3591                ArrowDataType::Int64,
3592                false,
3593            )])),
3594            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3595        );
3596    }
3597
3598    #[test]
3599    fn test_schema_too_many_columns() {
3600        run_schema_test_with_error(
3601            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3602            Arc::new(Schema::new(vec![
3603                Field::new("int64", ArrowDataType::Int64, false),
3604                Field::new("int32", ArrowDataType::Int32, false),
3605            ])),
3606            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3607        );
3608    }
3609
3610    #[test]
3611    fn test_schema_mismatched_column_names() {
3612        run_schema_test_with_error(
3613            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3614            Arc::new(Schema::new(vec![Field::new(
3615                "other",
3616                ArrowDataType::Int64,
3617                false,
3618            )])),
3619            "Arrow: incompatible arrow schema, expected field named int64 got other",
3620        );
3621    }
3622
3623    #[test]
3624    fn test_schema_incompatible_columns() {
3625        run_schema_test_with_error(
3626            vec![
3627                (
3628                    "col1_invalid",
3629                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3630                ),
3631                (
3632                    "col2_valid",
3633                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3634                ),
3635                (
3636                    "col3_invalid",
3637                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3638                ),
3639            ],
3640            Arc::new(Schema::new(vec![
3641                Field::new("col1_invalid", ArrowDataType::Int32, false),
3642                Field::new("col2_valid", ArrowDataType::Int32, false),
3643                Field::new("col3_invalid", ArrowDataType::Int32, false),
3644            ])),
3645            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3646        );
3647    }
3648
3649    #[test]
3650    fn test_one_incompatible_nested_column() {
3651        let nested_fields = Fields::from(vec![
3652            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3653            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3654        ]);
3655        let nested = StructArray::try_new(
3656            nested_fields,
3657            vec![
3658                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3659                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3660            ],
3661            None,
3662        )
3663        .expect("struct array");
3664        let supplied_nested_fields = Fields::from(vec![
3665            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3666            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3667        ]);
3668        run_schema_test_with_error(
3669            vec![
3670                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3671                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3672                ("nested", Arc::new(nested) as ArrayRef),
3673            ],
3674            Arc::new(Schema::new(vec![
3675                Field::new("col1", ArrowDataType::Int64, false),
3676                Field::new("col2", ArrowDataType::Int32, false),
3677                Field::new(
3678                    "nested",
3679                    ArrowDataType::Struct(supplied_nested_fields),
3680                    false,
3681                ),
3682            ])),
3683            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3684            requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3685            but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3686        );
3687    }
3688
3689    /// Return parquet data with a single column of utf8 strings
3690    fn utf8_parquet() -> Bytes {
3691        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3692        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3693        let props = None;
3694        // write parquet file with non nullable strings
3695        let mut parquet_data = vec![];
3696        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3697        writer.write(&batch).unwrap();
3698        writer.close().unwrap();
3699        Bytes::from(parquet_data)
3700    }
3701
3702    #[test]
3703    fn test_schema_error_bad_types() {
3704        // verify incompatible schemas error on read
3705        let parquet_data = utf8_parquet();
3706
3707        // Ask to read it back with an incompatible schema (int vs string)
3708        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3709            "column1",
3710            arrow::datatypes::DataType::Int32,
3711            false,
3712        )]));
3713
3714        // read it back out
3715        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3716        let err =
3717            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3718                .unwrap_err();
3719        assert_eq!(
3720            err.to_string(),
3721            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3722        )
3723    }
3724
3725    #[test]
3726    fn test_schema_error_bad_nullability() {
3727        // verify incompatible schemas error on read
3728        let parquet_data = utf8_parquet();
3729
3730        // Ask to read it back with an incompatible schema (nullability mismatch)
3731        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3732            "column1",
3733            arrow::datatypes::DataType::Utf8,
3734            true,
3735        )]));
3736
3737        // read it back out
3738        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3739        let err =
3740            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3741                .unwrap_err();
3742        assert_eq!(
3743            err.to_string(),
3744            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3745        )
3746    }
3747
3748    #[test]
3749    fn test_read_binary_as_utf8() {
3750        let file = write_parquet_from_iter(vec![
3751            (
3752                "binary_to_utf8",
3753                Arc::new(BinaryArray::from(vec![
3754                    b"one".as_ref(),
3755                    b"two".as_ref(),
3756                    b"three".as_ref(),
3757                ])) as ArrayRef,
3758            ),
3759            (
3760                "large_binary_to_large_utf8",
3761                Arc::new(LargeBinaryArray::from(vec![
3762                    b"one".as_ref(),
3763                    b"two".as_ref(),
3764                    b"three".as_ref(),
3765                ])) as ArrayRef,
3766            ),
3767            (
3768                "binary_view_to_utf8_view",
3769                Arc::new(BinaryViewArray::from(vec![
3770                    b"one".as_ref(),
3771                    b"two".as_ref(),
3772                    b"three".as_ref(),
3773                ])) as ArrayRef,
3774            ),
3775        ]);
3776        let supplied_fields = Fields::from(vec![
3777            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3778            Field::new(
3779                "large_binary_to_large_utf8",
3780                ArrowDataType::LargeUtf8,
3781                false,
3782            ),
3783            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3784        ]);
3785
3786        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3787        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3788            file.try_clone().unwrap(),
3789            options,
3790        )
3791        .expect("reader builder with schema")
3792        .build()
3793        .expect("reader with schema");
3794
3795        let batch = arrow_reader.next().unwrap().unwrap();
3796        assert_eq!(batch.num_columns(), 3);
3797        assert_eq!(batch.num_rows(), 3);
3798        assert_eq!(
3799            batch
3800                .column(0)
3801                .as_string::<i32>()
3802                .iter()
3803                .collect::<Vec<_>>(),
3804            vec![Some("one"), Some("two"), Some("three")]
3805        );
3806
3807        assert_eq!(
3808            batch
3809                .column(1)
3810                .as_string::<i64>()
3811                .iter()
3812                .collect::<Vec<_>>(),
3813            vec![Some("one"), Some("two"), Some("three")]
3814        );
3815
3816        assert_eq!(
3817            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3818            vec![Some("one"), Some("two"), Some("three")]
3819        );
3820    }
3821
3822    #[test]
3823    #[should_panic(expected = "Invalid UTF8 sequence at")]
3824    fn test_read_non_utf8_binary_as_utf8() {
3825        let file = write_parquet_from_iter(vec![(
3826            "non_utf8_binary",
3827            Arc::new(BinaryArray::from(vec![
3828                b"\xDE\x00\xFF".as_ref(),
3829                b"\xDE\x01\xAA".as_ref(),
3830                b"\xDE\x02\xFF".as_ref(),
3831            ])) as ArrayRef,
3832        )]);
3833        let supplied_fields = Fields::from(vec![Field::new(
3834            "non_utf8_binary",
3835            ArrowDataType::Utf8,
3836            false,
3837        )]);
3838
3839        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3840        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3841            file.try_clone().unwrap(),
3842            options,
3843        )
3844        .expect("reader builder with schema")
3845        .build()
3846        .expect("reader with schema");
3847        arrow_reader.next().unwrap().unwrap_err();
3848    }
3849
3850    #[test]
3851    fn test_with_schema() {
3852        let nested_fields = Fields::from(vec![
3853            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3854            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3855        ]);
3856
3857        let nested_arrays: Vec<ArrayRef> = vec![
3858            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3859            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3860        ];
3861
3862        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3863
3864        let file = write_parquet_from_iter(vec![
3865            (
3866                "int32_to_ts_second",
3867                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3868            ),
3869            (
3870                "date32_to_date64",
3871                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3872            ),
3873            ("nested", Arc::new(nested) as ArrayRef),
3874        ]);
3875
3876        let supplied_nested_fields = Fields::from(vec![
3877            Field::new(
3878                "utf8_to_dict",
3879                ArrowDataType::Dictionary(
3880                    Box::new(ArrowDataType::Int32),
3881                    Box::new(ArrowDataType::Utf8),
3882                ),
3883                false,
3884            ),
3885            Field::new(
3886                "int64_to_ts_nano",
3887                ArrowDataType::Timestamp(
3888                    arrow::datatypes::TimeUnit::Nanosecond,
3889                    Some("+10:00".into()),
3890                ),
3891                false,
3892            ),
3893        ]);
3894
3895        let supplied_schema = Arc::new(Schema::new(vec![
3896            Field::new(
3897                "int32_to_ts_second",
3898                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3899                false,
3900            ),
3901            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3902            Field::new(
3903                "nested",
3904                ArrowDataType::Struct(supplied_nested_fields),
3905                false,
3906            ),
3907        ]));
3908
3909        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3910        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3911            file.try_clone().unwrap(),
3912            options,
3913        )
3914        .expect("reader builder with schema")
3915        .build()
3916        .expect("reader with schema");
3917
3918        assert_eq!(arrow_reader.schema(), supplied_schema);
3919        let batch = arrow_reader.next().unwrap().unwrap();
3920        assert_eq!(batch.num_columns(), 3);
3921        assert_eq!(batch.num_rows(), 4);
3922        assert_eq!(
3923            batch
3924                .column(0)
3925                .as_any()
3926                .downcast_ref::<TimestampSecondArray>()
3927                .expect("downcast to timestamp second")
3928                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3929                .map(|v| v.to_string())
3930                .expect("value as datetime"),
3931            "1970-01-01 01:00:00 +01:00"
3932        );
3933        assert_eq!(
3934            batch
3935                .column(1)
3936                .as_any()
3937                .downcast_ref::<Date64Array>()
3938                .expect("downcast to date64")
3939                .value_as_date(0)
3940                .map(|v| v.to_string())
3941                .expect("value as date"),
3942            "1970-01-01"
3943        );
3944
3945        let nested = batch
3946            .column(2)
3947            .as_any()
3948            .downcast_ref::<StructArray>()
3949            .expect("downcast to struct");
3950
3951        let nested_dict = nested
3952            .column(0)
3953            .as_any()
3954            .downcast_ref::<Int32DictionaryArray>()
3955            .expect("downcast to dictionary");
3956
3957        assert_eq!(
3958            nested_dict
3959                .values()
3960                .as_any()
3961                .downcast_ref::<StringArray>()
3962                .expect("downcast to string")
3963                .iter()
3964                .collect::<Vec<_>>(),
3965            vec![Some("a"), Some("b")]
3966        );
3967
3968        assert_eq!(
3969            nested_dict.keys().iter().collect::<Vec<_>>(),
3970            vec![Some(0), Some(0), Some(0), Some(1)]
3971        );
3972
3973        assert_eq!(
3974            nested
3975                .column(1)
3976                .as_any()
3977                .downcast_ref::<TimestampNanosecondArray>()
3978                .expect("downcast to timestamp nanosecond")
3979                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3980                .map(|v| v.to_string())
3981                .expect("value as datetime"),
3982            "1970-01-01 10:00:00.000000001 +10:00"
3983        );
3984    }
3985
3986    #[test]
3987    fn test_empty_projection() {
3988        let testdata = arrow::util::test_util::parquet_test_data();
3989        let path = format!("{testdata}/alltypes_plain.parquet");
3990        let file = File::open(path).unwrap();
3991
3992        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3993        let file_metadata = builder.metadata().file_metadata();
3994        let expected_rows = file_metadata.num_rows() as usize;
3995
3996        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3997        let batch_reader = builder
3998            .with_projection(mask)
3999            .with_batch_size(2)
4000            .build()
4001            .unwrap();
4002
4003        let mut total_rows = 0;
4004        for maybe_batch in batch_reader {
4005            let batch = maybe_batch.unwrap();
4006            total_rows += batch.num_rows();
4007            assert_eq!(batch.num_columns(), 0);
4008            assert!(batch.num_rows() <= 2);
4009        }
4010
4011        assert_eq!(total_rows, expected_rows);
4012    }
4013
4014    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4015        let schema = Arc::new(Schema::new(vec![Field::new(
4016            "list",
4017            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4018            true,
4019        )]));
4020
4021        let mut buf = Vec::with_capacity(1024);
4022
4023        let mut writer = ArrowWriter::try_new(
4024            &mut buf,
4025            schema.clone(),
4026            Some(
4027                WriterProperties::builder()
4028                    .set_max_row_group_size(row_group_size)
4029                    .build(),
4030            ),
4031        )
4032        .unwrap();
4033        for _ in 0..2 {
4034            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4035            for _ in 0..(batch_size) {
4036                list_builder.append(true);
4037            }
4038            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4039                .unwrap();
4040            writer.write(&batch).unwrap();
4041        }
4042        writer.close().unwrap();
4043
4044        let mut record_reader =
4045            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4046        assert_eq!(
4047            batch_size,
4048            record_reader.next().unwrap().unwrap().num_rows()
4049        );
4050        assert_eq!(
4051            batch_size,
4052            record_reader.next().unwrap().unwrap().num_rows()
4053        );
4054    }
4055
4056    #[test]
4057    fn test_row_group_exact_multiple() {
4058        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4059        test_row_group_batch(8, 8);
4060        test_row_group_batch(10, 8);
4061        test_row_group_batch(8, 10);
4062        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4063        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4064        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4065        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4066        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4067    }
4068
4069    /// Given a RecordBatch containing all the column data, return the expected batches given
4070    /// a `batch_size` and `selection`
4071    fn get_expected_batches(
4072        column: &RecordBatch,
4073        selection: &RowSelection,
4074        batch_size: usize,
4075    ) -> Vec<RecordBatch> {
4076        let mut expected_batches = vec![];
4077
4078        let mut selection: VecDeque<_> = selection.clone().into();
4079        let mut row_offset = 0;
4080        let mut last_start = None;
4081        while row_offset < column.num_rows() && !selection.is_empty() {
4082            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4083            while batch_remaining > 0 && !selection.is_empty() {
4084                let (to_read, skip) = match selection.front_mut() {
4085                    Some(selection) if selection.row_count > batch_remaining => {
4086                        selection.row_count -= batch_remaining;
4087                        (batch_remaining, selection.skip)
4088                    }
4089                    Some(_) => {
4090                        let select = selection.pop_front().unwrap();
4091                        (select.row_count, select.skip)
4092                    }
4093                    None => break,
4094                };
4095
4096                batch_remaining -= to_read;
4097
4098                match skip {
4099                    true => {
4100                        if let Some(last_start) = last_start.take() {
4101                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4102                        }
4103                        row_offset += to_read
4104                    }
4105                    false => {
4106                        last_start.get_or_insert(row_offset);
4107                        row_offset += to_read
4108                    }
4109                }
4110            }
4111        }
4112
4113        if let Some(last_start) = last_start.take() {
4114            expected_batches.push(column.slice(last_start, row_offset - last_start))
4115        }
4116
4117        // Sanity check, all batches except the final should be the batch size
4118        for batch in &expected_batches[..expected_batches.len() - 1] {
4119            assert_eq!(batch.num_rows(), batch_size);
4120        }
4121
4122        expected_batches
4123    }
4124
4125    fn create_test_selection(
4126        step_len: usize,
4127        total_len: usize,
4128        skip_first: bool,
4129    ) -> (RowSelection, usize) {
4130        let mut remaining = total_len;
4131        let mut skip = skip_first;
4132        let mut vec = vec![];
4133        let mut selected_count = 0;
4134        while remaining != 0 {
4135            let step = if remaining > step_len {
4136                step_len
4137            } else {
4138                remaining
4139            };
4140            vec.push(RowSelector {
4141                row_count: step,
4142                skip,
4143            });
4144            remaining -= step;
4145            if !skip {
4146                selected_count += step;
4147            }
4148            skip = !skip;
4149        }
4150        (vec.into(), selected_count)
4151    }
4152
4153    #[test]
4154    fn test_scan_row_with_selection() {
4155        let testdata = arrow::util::test_util::parquet_test_data();
4156        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4157        let test_file = File::open(&path).unwrap();
4158
4159        let mut serial_reader =
4160            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4161        let data = serial_reader.next().unwrap().unwrap();
4162
4163        let do_test = |batch_size: usize, selection_len: usize| {
4164            for skip_first in [false, true] {
4165                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4166
4167                let expected = get_expected_batches(&data, &selections, batch_size);
4168                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4169                assert_eq!(
4170                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4171                    expected,
4172                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4173                );
4174            }
4175        };
4176
4177        // total row count 7300
4178        // 1. test selection len more than one page row count
4179        do_test(1000, 1000);
4180
4181        // 2. test selection len less than one page row count
4182        do_test(20, 20);
4183
4184        // 3. test selection_len less than batch_size
4185        do_test(20, 5);
4186
4187        // 4. test selection_len more than batch_size
4188        // If batch_size < selection_len
4189        do_test(20, 5);
4190
4191        fn create_skip_reader(
4192            test_file: &File,
4193            batch_size: usize,
4194            selections: RowSelection,
4195        ) -> ParquetRecordBatchReader {
4196            let options = ArrowReaderOptions::new().with_page_index(true);
4197            let file = test_file.try_clone().unwrap();
4198            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4199                .unwrap()
4200                .with_batch_size(batch_size)
4201                .with_row_selection(selections)
4202                .build()
4203                .unwrap()
4204        }
4205    }
4206
4207    #[test]
4208    fn test_batch_size_overallocate() {
4209        let testdata = arrow::util::test_util::parquet_test_data();
4210        // `alltypes_plain.parquet` only have 8 rows
4211        let path = format!("{testdata}/alltypes_plain.parquet");
4212        let test_file = File::open(path).unwrap();
4213
4214        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4215        let num_rows = builder.metadata.file_metadata().num_rows();
4216        let reader = builder
4217            .with_batch_size(1024)
4218            .with_projection(ProjectionMask::all())
4219            .build()
4220            .unwrap();
4221        assert_ne!(1024, num_rows);
4222        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4223    }
4224
4225    #[test]
4226    fn test_read_with_page_index_enabled() {
4227        let testdata = arrow::util::test_util::parquet_test_data();
4228
4229        {
4230            // `alltypes_tiny_pages.parquet` has page index
4231            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4232            let test_file = File::open(path).unwrap();
4233            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4234                test_file,
4235                ArrowReaderOptions::new().with_page_index(true),
4236            )
4237            .unwrap();
4238            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4239            let reader = builder.build().unwrap();
4240            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4241            assert_eq!(batches.len(), 8);
4242        }
4243
4244        {
4245            // `alltypes_plain.parquet` doesn't have page index
4246            let path = format!("{testdata}/alltypes_plain.parquet");
4247            let test_file = File::open(path).unwrap();
4248            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4249                test_file,
4250                ArrowReaderOptions::new().with_page_index(true),
4251            )
4252            .unwrap();
4253            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4254            // we should read the file successfully.
4255            assert!(builder.metadata().offset_index().is_none());
4256            let reader = builder.build().unwrap();
4257            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4258            assert_eq!(batches.len(), 1);
4259        }
4260    }
4261
4262    #[test]
4263    fn test_raw_repetition() {
4264        const MESSAGE_TYPE: &str = "
4265            message Log {
4266              OPTIONAL INT32 eventType;
4267              REPEATED INT32 category;
4268              REPEATED group filter {
4269                OPTIONAL INT32 error;
4270              }
4271            }
4272        ";
4273        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4274        let props = Default::default();
4275
4276        let mut buf = Vec::with_capacity(1024);
4277        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4278        let mut row_group_writer = writer.next_row_group().unwrap();
4279
4280        // column 0
4281        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4282        col_writer
4283            .typed::<Int32Type>()
4284            .write_batch(&[1], Some(&[1]), None)
4285            .unwrap();
4286        col_writer.close().unwrap();
4287        // column 1
4288        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4289        col_writer
4290            .typed::<Int32Type>()
4291            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4292            .unwrap();
4293        col_writer.close().unwrap();
4294        // column 2
4295        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4296        col_writer
4297            .typed::<Int32Type>()
4298            .write_batch(&[1], Some(&[1]), Some(&[0]))
4299            .unwrap();
4300        col_writer.close().unwrap();
4301
4302        let rg_md = row_group_writer.close().unwrap();
4303        assert_eq!(rg_md.num_rows(), 1);
4304        writer.close().unwrap();
4305
4306        let bytes = Bytes::from(buf);
4307
4308        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4309        let full = no_mask.next().unwrap().unwrap();
4310
4311        assert_eq!(full.num_columns(), 3);
4312
4313        for idx in 0..3 {
4314            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4315            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4316            let mut reader = b.with_projection(mask).build().unwrap();
4317            let projected = reader.next().unwrap().unwrap();
4318
4319            assert_eq!(projected.num_columns(), 1);
4320            assert_eq!(full.column(idx), projected.column(0));
4321        }
4322    }
4323
4324    #[test]
4325    fn test_read_lz4_raw() {
4326        let testdata = arrow::util::test_util::parquet_test_data();
4327        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4328        let file = File::open(path).unwrap();
4329
4330        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4331            .unwrap()
4332            .collect::<Result<Vec<_>, _>>()
4333            .unwrap();
4334        assert_eq!(batches.len(), 1);
4335        let batch = &batches[0];
4336
4337        assert_eq!(batch.num_columns(), 3);
4338        assert_eq!(batch.num_rows(), 4);
4339
4340        // https://github.com/apache/parquet-testing/pull/18
4341        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4342        assert_eq!(
4343            a.values(),
4344            &[1593604800, 1593604800, 1593604801, 1593604801]
4345        );
4346
4347        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4348        let a: Vec<_> = a.iter().flatten().collect();
4349        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4350
4351        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4352        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4353    }
4354
4355    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4356    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4357    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4358    //    LZ4_HADOOP algorithm for compression.
4359    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4360    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4361    //    older parquet-cpp versions.
4362    //
4363    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4364    #[test]
4365    fn test_read_lz4_hadoop_fallback() {
4366        for file in [
4367            "hadoop_lz4_compressed.parquet",
4368            "non_hadoop_lz4_compressed.parquet",
4369        ] {
4370            let testdata = arrow::util::test_util::parquet_test_data();
4371            let path = format!("{testdata}/{file}");
4372            let file = File::open(path).unwrap();
4373            let expected_rows = 4;
4374
4375            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4376                .unwrap()
4377                .collect::<Result<Vec<_>, _>>()
4378                .unwrap();
4379            assert_eq!(batches.len(), 1);
4380            let batch = &batches[0];
4381
4382            assert_eq!(batch.num_columns(), 3);
4383            assert_eq!(batch.num_rows(), expected_rows);
4384
4385            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4386            assert_eq!(
4387                a.values(),
4388                &[1593604800, 1593604800, 1593604801, 1593604801]
4389            );
4390
4391            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4392            let b: Vec<_> = b.iter().flatten().collect();
4393            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4394
4395            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4396            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4397        }
4398    }
4399
4400    #[test]
4401    fn test_read_lz4_hadoop_large() {
4402        let testdata = arrow::util::test_util::parquet_test_data();
4403        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4404        let file = File::open(path).unwrap();
4405        let expected_rows = 10000;
4406
4407        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4408            .unwrap()
4409            .collect::<Result<Vec<_>, _>>()
4410            .unwrap();
4411        assert_eq!(batches.len(), 1);
4412        let batch = &batches[0];
4413
4414        assert_eq!(batch.num_columns(), 1);
4415        assert_eq!(batch.num_rows(), expected_rows);
4416
4417        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4418        let a: Vec<_> = a.iter().flatten().collect();
4419        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4420        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4421        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4422        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4423    }
4424
4425    #[test]
4426    #[cfg(feature = "snap")]
4427    fn test_read_nested_lists() {
4428        let testdata = arrow::util::test_util::parquet_test_data();
4429        let path = format!("{testdata}/nested_lists.snappy.parquet");
4430        let file = File::open(path).unwrap();
4431
4432        let f = file.try_clone().unwrap();
4433        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4434        let expected = reader.next().unwrap().unwrap();
4435        assert_eq!(expected.num_rows(), 3);
4436
4437        let selection = RowSelection::from(vec![
4438            RowSelector::skip(1),
4439            RowSelector::select(1),
4440            RowSelector::skip(1),
4441        ]);
4442        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4443            .unwrap()
4444            .with_row_selection(selection)
4445            .build()
4446            .unwrap();
4447
4448        let actual = reader.next().unwrap().unwrap();
4449        assert_eq!(actual.num_rows(), 1);
4450        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4451    }
4452
4453    #[test]
4454    fn test_arbitrary_decimal() {
4455        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4456        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4457            .with_precision_and_scale(19, 0)
4458            .unwrap();
4459        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4460            .with_precision_and_scale(12, 0)
4461            .unwrap();
4462        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4463            .with_precision_and_scale(17, 10)
4464            .unwrap();
4465
4466        let written = RecordBatch::try_from_iter([
4467            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4468            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4469            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4470        ])
4471        .unwrap();
4472
4473        let mut buffer = Vec::with_capacity(1024);
4474        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4475        writer.write(&written).unwrap();
4476        writer.close().unwrap();
4477
4478        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4479            .unwrap()
4480            .collect::<Result<Vec<_>, _>>()
4481            .unwrap();
4482
4483        assert_eq!(&written.slice(0, 8), &read[0]);
4484    }
4485
4486    #[test]
4487    fn test_list_skip() {
4488        let mut list = ListBuilder::new(Int32Builder::new());
4489        list.append_value([Some(1), Some(2)]);
4490        list.append_value([Some(3)]);
4491        list.append_value([Some(4)]);
4492        let list = list.finish();
4493        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4494
4495        // First page contains 2 values but only 1 row
4496        let props = WriterProperties::builder()
4497            .set_data_page_row_count_limit(1)
4498            .set_write_batch_size(2)
4499            .build();
4500
4501        let mut buffer = Vec::with_capacity(1024);
4502        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4503        writer.write(&batch).unwrap();
4504        writer.close().unwrap();
4505
4506        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4507        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4508            .unwrap()
4509            .with_row_selection(selection.into())
4510            .build()
4511            .unwrap();
4512        let out = reader.next().unwrap().unwrap();
4513        assert_eq!(out.num_rows(), 1);
4514        assert_eq!(out, batch.slice(2, 1));
4515    }
4516
4517    fn test_decimal32_roundtrip() {
4518        let d = |values: Vec<i32>, p: u8| {
4519            let iter = values.into_iter();
4520            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4521                .with_precision_and_scale(p, 2)
4522                .unwrap()
4523        };
4524
4525        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4526        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4527
4528        let mut buffer = Vec::with_capacity(1024);
4529        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4530        writer.write(&batch).unwrap();
4531        writer.close().unwrap();
4532
4533        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4534        let t1 = builder.parquet_schema().columns()[0].physical_type();
4535        assert_eq!(t1, PhysicalType::INT32);
4536
4537        let mut reader = builder.build().unwrap();
4538        assert_eq!(batch.schema(), reader.schema());
4539
4540        let out = reader.next().unwrap().unwrap();
4541        assert_eq!(batch, out);
4542    }
4543
4544    fn test_decimal64_roundtrip() {
4545        // Precision <= 9 -> INT32
4546        // Precision <= 18 -> INT64
4547
4548        let d = |values: Vec<i64>, p: u8| {
4549            let iter = values.into_iter();
4550            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4551                .with_precision_and_scale(p, 2)
4552                .unwrap()
4553        };
4554
4555        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4556        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4557        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4558
4559        let batch = RecordBatch::try_from_iter([
4560            ("d1", Arc::new(d1) as ArrayRef),
4561            ("d2", Arc::new(d2) as ArrayRef),
4562            ("d3", Arc::new(d3) as ArrayRef),
4563        ])
4564        .unwrap();
4565
4566        let mut buffer = Vec::with_capacity(1024);
4567        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4568        writer.write(&batch).unwrap();
4569        writer.close().unwrap();
4570
4571        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4572        let t1 = builder.parquet_schema().columns()[0].physical_type();
4573        assert_eq!(t1, PhysicalType::INT32);
4574        let t2 = builder.parquet_schema().columns()[1].physical_type();
4575        assert_eq!(t2, PhysicalType::INT64);
4576        let t3 = builder.parquet_schema().columns()[2].physical_type();
4577        assert_eq!(t3, PhysicalType::INT64);
4578
4579        let mut reader = builder.build().unwrap();
4580        assert_eq!(batch.schema(), reader.schema());
4581
4582        let out = reader.next().unwrap().unwrap();
4583        assert_eq!(batch, out);
4584    }
4585
4586    fn test_decimal_roundtrip<T: DecimalType>() {
4587        // Precision <= 9 -> INT32
4588        // Precision <= 18 -> INT64
4589        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4590
4591        let d = |values: Vec<usize>, p: u8| {
4592            let iter = values.into_iter().map(T::Native::usize_as);
4593            PrimitiveArray::<T>::from_iter_values(iter)
4594                .with_precision_and_scale(p, 2)
4595                .unwrap()
4596        };
4597
4598        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4599        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4600        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4601        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4602
4603        let batch = RecordBatch::try_from_iter([
4604            ("d1", Arc::new(d1) as ArrayRef),
4605            ("d2", Arc::new(d2) as ArrayRef),
4606            ("d3", Arc::new(d3) as ArrayRef),
4607            ("d4", Arc::new(d4) as ArrayRef),
4608        ])
4609        .unwrap();
4610
4611        let mut buffer = Vec::with_capacity(1024);
4612        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4613        writer.write(&batch).unwrap();
4614        writer.close().unwrap();
4615
4616        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4617        let t1 = builder.parquet_schema().columns()[0].physical_type();
4618        assert_eq!(t1, PhysicalType::INT32);
4619        let t2 = builder.parquet_schema().columns()[1].physical_type();
4620        assert_eq!(t2, PhysicalType::INT64);
4621        let t3 = builder.parquet_schema().columns()[2].physical_type();
4622        assert_eq!(t3, PhysicalType::INT64);
4623        let t4 = builder.parquet_schema().columns()[3].physical_type();
4624        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4625
4626        let mut reader = builder.build().unwrap();
4627        assert_eq!(batch.schema(), reader.schema());
4628
4629        let out = reader.next().unwrap().unwrap();
4630        assert_eq!(batch, out);
4631    }
4632
4633    #[test]
4634    fn test_decimal() {
4635        test_decimal32_roundtrip();
4636        test_decimal64_roundtrip();
4637        test_decimal_roundtrip::<Decimal128Type>();
4638        test_decimal_roundtrip::<Decimal256Type>();
4639    }
4640
4641    #[test]
4642    fn test_list_selection() {
4643        let schema = Arc::new(Schema::new(vec![Field::new_list(
4644            "list",
4645            Field::new_list_field(ArrowDataType::Utf8, true),
4646            false,
4647        )]));
4648        let mut buf = Vec::with_capacity(1024);
4649
4650        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4651
4652        for i in 0..2 {
4653            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4654            for j in 0..1024 {
4655                list_a_builder.values().append_value(format!("{i} {j}"));
4656                list_a_builder.append(true);
4657            }
4658            let batch =
4659                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4660                    .unwrap();
4661            writer.write(&batch).unwrap();
4662        }
4663        let _metadata = writer.close().unwrap();
4664
4665        let buf = Bytes::from(buf);
4666        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4667            .unwrap()
4668            .with_row_selection(RowSelection::from(vec![
4669                RowSelector::skip(100),
4670                RowSelector::select(924),
4671                RowSelector::skip(100),
4672                RowSelector::select(924),
4673            ]))
4674            .build()
4675            .unwrap();
4676
4677        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4678        let batch = concat_batches(&schema, &batches).unwrap();
4679
4680        assert_eq!(batch.num_rows(), 924 * 2);
4681        let list = batch.column(0).as_list::<i32>();
4682
4683        for w in list.value_offsets().windows(2) {
4684            assert_eq!(w[0] + 1, w[1])
4685        }
4686        let mut values = list.values().as_string::<i32>().iter();
4687
4688        for i in 0..2 {
4689            for j in 100..1024 {
4690                let expected = format!("{i} {j}");
4691                assert_eq!(values.next().unwrap().unwrap(), &expected);
4692            }
4693        }
4694    }
4695
4696    #[test]
4697    fn test_list_selection_fuzz() {
4698        let mut rng = rng();
4699        let schema = Arc::new(Schema::new(vec![Field::new_list(
4700            "list",
4701            Field::new_list(
4702                Field::LIST_FIELD_DEFAULT_NAME,
4703                Field::new_list_field(ArrowDataType::Int32, true),
4704                true,
4705            ),
4706            true,
4707        )]));
4708        let mut buf = Vec::with_capacity(1024);
4709        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4710
4711        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4712
4713        for _ in 0..2048 {
4714            if rng.random_bool(0.2) {
4715                list_a_builder.append(false);
4716                continue;
4717            }
4718
4719            let list_a_len = rng.random_range(0..10);
4720            let list_b_builder = list_a_builder.values();
4721
4722            for _ in 0..list_a_len {
4723                if rng.random_bool(0.2) {
4724                    list_b_builder.append(false);
4725                    continue;
4726                }
4727
4728                let list_b_len = rng.random_range(0..10);
4729                let int_builder = list_b_builder.values();
4730                for _ in 0..list_b_len {
4731                    match rng.random_bool(0.2) {
4732                        true => int_builder.append_null(),
4733                        false => int_builder.append_value(rng.random()),
4734                    }
4735                }
4736                list_b_builder.append(true)
4737            }
4738            list_a_builder.append(true);
4739        }
4740
4741        let array = Arc::new(list_a_builder.finish());
4742        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4743
4744        writer.write(&batch).unwrap();
4745        let _metadata = writer.close().unwrap();
4746
4747        let buf = Bytes::from(buf);
4748
4749        let cases = [
4750            vec![
4751                RowSelector::skip(100),
4752                RowSelector::select(924),
4753                RowSelector::skip(100),
4754                RowSelector::select(924),
4755            ],
4756            vec![
4757                RowSelector::select(924),
4758                RowSelector::skip(100),
4759                RowSelector::select(924),
4760                RowSelector::skip(100),
4761            ],
4762            vec![
4763                RowSelector::skip(1023),
4764                RowSelector::select(1),
4765                RowSelector::skip(1023),
4766                RowSelector::select(1),
4767            ],
4768            vec![
4769                RowSelector::select(1),
4770                RowSelector::skip(1023),
4771                RowSelector::select(1),
4772                RowSelector::skip(1023),
4773            ],
4774        ];
4775
4776        for batch_size in [100, 1024, 2048] {
4777            for selection in &cases {
4778                let selection = RowSelection::from(selection.clone());
4779                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4780                    .unwrap()
4781                    .with_row_selection(selection.clone())
4782                    .with_batch_size(batch_size)
4783                    .build()
4784                    .unwrap();
4785
4786                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4787                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4788                assert_eq!(actual.num_rows(), selection.row_count());
4789
4790                let mut batch_offset = 0;
4791                let mut actual_offset = 0;
4792                for selector in selection.iter() {
4793                    if selector.skip {
4794                        batch_offset += selector.row_count;
4795                        continue;
4796                    }
4797
4798                    assert_eq!(
4799                        batch.slice(batch_offset, selector.row_count),
4800                        actual.slice(actual_offset, selector.row_count)
4801                    );
4802
4803                    batch_offset += selector.row_count;
4804                    actual_offset += selector.row_count;
4805                }
4806            }
4807        }
4808    }
4809
4810    #[test]
4811    fn test_read_old_nested_list() {
4812        use arrow::datatypes::DataType;
4813        use arrow::datatypes::ToByteSlice;
4814
4815        let testdata = arrow::util::test_util::parquet_test_data();
4816        // message my_record {
4817        //     REQUIRED group a (LIST) {
4818        //         REPEATED group array (LIST) {
4819        //             REPEATED INT32 array;
4820        //         }
4821        //     }
4822        // }
4823        // should be read as list<list<int32>>
4824        let path = format!("{testdata}/old_list_structure.parquet");
4825        let test_file = File::open(path).unwrap();
4826
4827        // create expected ListArray
4828        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4829
4830        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4831        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4832
4833        // Construct a list array from the above two
4834        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4835            "array",
4836            DataType::Int32,
4837            false,
4838        ))))
4839        .len(2)
4840        .add_buffer(a_value_offsets)
4841        .add_child_data(a_values.into_data())
4842        .build()
4843        .unwrap();
4844        let a = ListArray::from(a_list_data);
4845
4846        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4847        let mut reader = builder.build().unwrap();
4848        let out = reader.next().unwrap().unwrap();
4849        assert_eq!(out.num_rows(), 1);
4850        assert_eq!(out.num_columns(), 1);
4851        // grab first column
4852        let c0 = out.column(0);
4853        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4854        // get first row: [[1, 2], [3, 4]]
4855        let r0 = c0arr.value(0);
4856        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4857        assert_eq!(r0arr, &a);
4858    }
4859
4860    #[test]
4861    fn test_map_no_value() {
4862        // File schema:
4863        // message schema {
4864        //   required group my_map (MAP) {
4865        //     repeated group key_value {
4866        //       required int32 key;
4867        //       optional int32 value;
4868        //     }
4869        //   }
4870        //   required group my_map_no_v (MAP) {
4871        //     repeated group key_value {
4872        //       required int32 key;
4873        //     }
4874        //   }
4875        //   required group my_list (LIST) {
4876        //     repeated group list {
4877        //       required int32 element;
4878        //     }
4879        //   }
4880        // }
4881        let testdata = arrow::util::test_util::parquet_test_data();
4882        let path = format!("{testdata}/map_no_value.parquet");
4883        let file = File::open(path).unwrap();
4884
4885        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4886            .unwrap()
4887            .build()
4888            .unwrap();
4889        let out = reader.next().unwrap().unwrap();
4890        assert_eq!(out.num_rows(), 3);
4891        assert_eq!(out.num_columns(), 3);
4892        // my_map_no_v and my_list columns should now be equivalent
4893        let c0 = out.column(1).as_list::<i32>();
4894        let c1 = out.column(2).as_list::<i32>();
4895        assert_eq!(c0.len(), c1.len());
4896        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4897    }
4898
4899    #[test]
4900    fn test_get_row_group_column_bloom_filter_with_length() {
4901        // convert to new parquet file with bloom_filter_length
4902        let testdata = arrow::util::test_util::parquet_test_data();
4903        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4904        let file = File::open(path).unwrap();
4905        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4906        let schema = builder.schema().clone();
4907        let reader = builder.build().unwrap();
4908
4909        let mut parquet_data = Vec::new();
4910        let props = WriterProperties::builder()
4911            .set_bloom_filter_enabled(true)
4912            .build();
4913        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4914        for batch in reader {
4915            let batch = batch.unwrap();
4916            writer.write(&batch).unwrap();
4917        }
4918        writer.close().unwrap();
4919
4920        // test the new parquet file
4921        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4922    }
4923
4924    #[test]
4925    fn test_get_row_group_column_bloom_filter_without_length() {
4926        let testdata = arrow::util::test_util::parquet_test_data();
4927        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4928        let data = Bytes::from(std::fs::read(path).unwrap());
4929        test_get_row_group_column_bloom_filter(data, false);
4930    }
4931
4932    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4933        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4934
4935        let metadata = builder.metadata();
4936        assert_eq!(metadata.num_row_groups(), 1);
4937        let row_group = metadata.row_group(0);
4938        let column = row_group.column(0);
4939        assert_eq!(column.bloom_filter_length().is_some(), with_length);
4940
4941        let sbbf = builder
4942            .get_row_group_column_bloom_filter(0, 0)
4943            .unwrap()
4944            .unwrap();
4945        assert!(sbbf.check(&"Hello"));
4946        assert!(!sbbf.check(&"Hello_Not_Exists"));
4947    }
4948}