parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::bloom_filter::{
34    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
35};
36use crate::column::page::{PageIterator, PageReader};
37#[cfg(feature = "encryption")]
38use crate::encryption::decrypt::FileDecryptionProperties;
39use crate::errors::{ParquetError, Result};
40use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
41use crate::file::reader::{ChunkReader, SerializedPageReader};
42use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54/// Builder for constructing Parquet readers that decode into [Apache Arrow]
55/// arrays.
56///
57/// Most users should use one of the following specializations:
58///
59/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
60/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61///
62/// # Features
63/// * Projection pushdown: [`Self::with_projection`]
64/// * Cached metadata: [`ArrowReaderMetadata::load`]
65/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
66/// * Row group filtering: [`Self::with_row_groups`]
67/// * Range filtering: [`Self::with_row_selection`]
68/// * Row level filtering: [`Self::with_row_filter`]
69///
70/// # Implementing Predicate Pushdown
71///
72/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
73/// process, which is efficient and allows the most low level optimizations.
74///
75/// However, most Parquet based systems will apply filters at many steps prior
76/// to decoding such as pruning files, row groups and data pages. This crate
77/// provides the low level APIs needed to implement such filtering, but does not
78/// include any logic to actually evaluate predicates. For example:
79///
80/// * [`Self::with_row_groups`] for Row Group pruning
81/// * [`Self::with_row_selection`] for data page pruning
82/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
83///
84/// The rationale for this design is that implementing predicate pushdown is a
85/// complex topic and varies significantly from system to system. For example
86///
87/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
88/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
89/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
90/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
91///
92/// You can read more about this design in the [Querying Parquet with
93/// Millisecond Latency] Arrow blog post.
94///
95/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
96/// [Apache Arrow]: https://arrow.apache.org/
97/// [`StatisticsConverter`]: statistics::StatisticsConverter
98/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
99pub struct ArrowReaderBuilder<T> {
100    pub(crate) input: T,
101
102    pub(crate) metadata: Arc<ParquetMetaData>,
103
104    pub(crate) schema: SchemaRef,
105
106    pub(crate) fields: Option<Arc<ParquetField>>,
107
108    pub(crate) batch_size: usize,
109
110    pub(crate) row_groups: Option<Vec<usize>>,
111
112    pub(crate) projection: ProjectionMask,
113
114    pub(crate) filter: Option<RowFilter>,
115
116    pub(crate) selection: Option<RowSelection>,
117
118    pub(crate) limit: Option<usize>,
119
120    pub(crate) offset: Option<usize>,
121
122    pub(crate) metrics: ArrowReaderMetrics,
123
124    pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("ArrowReaderBuilder<T>")
130            .field("input", &self.input)
131            .field("metadata", &self.metadata)
132            .field("schema", &self.schema)
133            .field("fields", &self.fields)
134            .field("batch_size", &self.batch_size)
135            .field("row_groups", &self.row_groups)
136            .field("projection", &self.projection)
137            .field("filter", &self.filter)
138            .field("selection", &self.selection)
139            .field("limit", &self.limit)
140            .field("offset", &self.offset)
141            .field("metrics", &self.metrics)
142            .finish()
143    }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148        Self {
149            input,
150            metadata: metadata.metadata,
151            schema: metadata.schema,
152            fields: metadata.fields,
153            batch_size: 1024,
154            row_groups: None,
155            projection: ProjectionMask::all(),
156            filter: None,
157            selection: None,
158            limit: None,
159            offset: None,
160            metrics: ArrowReaderMetrics::Disabled,
161            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
162        }
163    }
164
165    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
166    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167        &self.metadata
168    }
169
170    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
171    pub fn parquet_schema(&self) -> &SchemaDescriptor {
172        self.metadata.file_metadata().schema_descr()
173    }
174
175    /// Returns the arrow [`SchemaRef`] for this parquet file
176    pub fn schema(&self) -> &SchemaRef {
177        &self.schema
178    }
179
180    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
181    /// If the batch_size more than the file row count, use the file row count.
182    pub fn with_batch_size(self, batch_size: usize) -> Self {
183        // Try to avoid allocate large buffer
184        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185        Self { batch_size, ..self }
186    }
187
188    /// Only read data from the provided row group indexes
189    ///
190    /// This is also called row group filtering
191    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192        Self {
193            row_groups: Some(row_groups),
194            ..self
195        }
196    }
197
198    /// Only read data from the provided column indexes
199    pub fn with_projection(self, mask: ProjectionMask) -> Self {
200        Self {
201            projection: mask,
202            ..self
203        }
204    }
205
206    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
207    /// data into memory.
208    ///
209    /// This feature is used to restrict which rows are decoded within row
210    /// groups, skipping ranges of rows that are not needed. Such selections
211    /// could be determined by evaluating predicates against the parquet page
212    /// [`Index`] or some other external information available to a query
213    /// engine.
214    ///
215    /// # Notes
216    ///
217    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
218    /// applying the row selection, and therefore rows from skipped row groups
219    /// should not be included in the [`RowSelection`] (see example below)
220    ///
221    /// It is recommended to enable writing the page index if using this
222    /// functionality, to allow more efficient skipping over data pages. See
223    /// [`ArrowReaderOptions::with_page_index`].
224    ///
225    /// # Example
226    ///
227    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
228    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
229    /// row group 3:
230    ///
231    /// ```text
232    ///   Row Group 0, 1000 rows (selected)
233    ///   Row Group 1, 1000 rows (skipped)
234    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
235    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
236    /// ```
237    ///
238    /// You could pass the following [`RowSelection`]:
239    ///
240    /// ```text
241    ///  Select 1000    (scan all rows in row group 0)
242    ///  Skip 50        (skip the first 50 rows in row group 2)
243    ///  Select 50      (scan rows 50-100 in row group 2)
244    ///  Skip 900       (skip the remaining rows in row group 2)
245    ///  Skip 200       (skip the first 200 rows in row group 3)
246    ///  Select 100     (scan rows 200-300 in row group 3)
247    ///  Skip 700       (skip the remaining rows in row group 3)
248    /// ```
249    /// Note there is no entry for the (entirely) skipped row group 1.
250    ///
251    /// Note you can represent the same selection with fewer entries. Instead of
252    ///
253    /// ```text
254    ///  Skip 900       (skip the remaining rows in row group 2)
255    ///  Skip 200       (skip the first 200 rows in row group 3)
256    /// ```
257    ///
258    /// you could use
259    ///
260    /// ```text
261    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
262    /// ```
263    ///
264    /// [`Index`]: crate::file::page_index::index::Index
265    pub fn with_row_selection(self, selection: RowSelection) -> Self {
266        Self {
267            selection: Some(selection),
268            ..self
269        }
270    }
271
272    /// Provide a [`RowFilter`] to skip decoding rows
273    ///
274    /// Row filters are applied after row group selection and row selection
275    ///
276    /// It is recommended to enable reading the page index if using this functionality, to allow
277    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
278    pub fn with_row_filter(self, filter: RowFilter) -> Self {
279        Self {
280            filter: Some(filter),
281            ..self
282        }
283    }
284
285    /// Provide a limit to the number of rows to be read
286    ///
287    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
288    /// allowing it to limit the final set of rows decoded after any pushed down predicates
289    ///
290    /// It is recommended to enable reading the page index if using this functionality, to allow
291    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
292    pub fn with_limit(self, limit: usize) -> Self {
293        Self {
294            limit: Some(limit),
295            ..self
296        }
297    }
298
299    /// Provide an offset to skip over the given number of rows
300    ///
301    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
302    /// allowing it to skip rows after any pushed down predicates
303    ///
304    /// It is recommended to enable reading the page index if using this functionality, to allow
305    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
306    pub fn with_offset(self, offset: usize) -> Self {
307        Self {
308            offset: Some(offset),
309            ..self
310        }
311    }
312
313    /// Specify metrics collection during reading
314    ///
315    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
316    /// clone of the provided metrics to the builder.
317    ///
318    /// For example:
319    ///
320    /// ```rust
321    /// # use std::sync::Arc;
322    /// # use bytes::Bytes;
323    /// # use arrow_array::{Int32Array, RecordBatch};
324    /// # use arrow_schema::{DataType, Field, Schema};
325    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
326    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
327    /// # use parquet::arrow::ArrowWriter;
328    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
329    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
330    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
331    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
332    /// # writer.write(&batch).unwrap();
333    /// # writer.close().unwrap();
334    /// # let file = Bytes::from(file);
335    /// // Create metrics object to pass into the reader
336    /// let metrics = ArrowReaderMetrics::enabled();
337    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
338    ///   // Configure the builder to use the metrics by passing a clone
339    ///   .with_metrics(metrics.clone())
340    ///   // Build the reader
341    ///   .build().unwrap();
342    /// // .. read data from the reader ..
343    ///
344    /// // check the metrics
345    /// assert!(metrics.records_read_from_inner().is_some());
346    /// ```
347    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348        Self { metrics, ..self }
349    }
350
351    /// Set the maximum size (per row group) of the predicate cache in bytes for
352    /// the async decoder.
353    ///
354    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
355    /// unlimited cache size.
356    ///
357    /// This cache is used to store decoded arrays that are used in
358    /// predicate evaluation ([`Self::with_row_filter`]).
359    ///
360    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
361    /// [this ticket] for more details and alternatives.
362    ///
363    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
364    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
365    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366        Self {
367            max_predicate_cache_size,
368            ..self
369        }
370    }
371}
372
373/// Options that control how metadata is read for a parquet file
374///
375/// See [`ArrowReaderBuilder`] for how to configure how the column data
376/// is then read from the file, including projection and filter pushdown
377#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379    /// Should the reader strip any user defined metadata from the Arrow schema
380    skip_arrow_metadata: bool,
381    /// If provided used as the schema hint when determining the Arrow schema,
382    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
383    ///
384    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
385    supplied_schema: Option<SchemaRef>,
386    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
387    pub(crate) page_index: bool,
388    /// If encryption is enabled, the file decryption properties can be provided
389    #[cfg(feature = "encryption")]
390    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394    /// Create a new [`ArrowReaderOptions`] with the default settings
395    pub fn new() -> Self {
396        Self::default()
397    }
398
399    /// Skip decoding the embedded arrow metadata (defaults to `false`)
400    ///
401    /// Parquet files generated by some writers may contain embedded arrow
402    /// schema and metadata.
403    /// This may not be correct or compatible with your system,
404    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
405    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406        Self {
407            skip_arrow_metadata,
408            ..self
409        }
410    }
411
412    /// Provide a schema hint to use when reading the Parquet file.
413    ///
414    /// If provided, this schema takes precedence over any arrow schema embedded
415    /// in the metadata (see the [`arrow`] documentation for more details).
416    ///
417    /// If the provided schema is not compatible with the data stored in the
418    /// parquet file schema, an error will be returned when constructing the
419    /// builder.
420    ///
421    /// This option is only required if you want to explicitly control the
422    /// conversion of Parquet types to Arrow types, such as casting a column to
423    /// a different type. For example, if you wanted to read an Int64 in
424    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
425    ///
426    /// [`arrow`]: crate::arrow
427    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
428    ///
429    /// # Notes
430    ///
431    /// The provided schema must have the same number of columns as the parquet schema and
432    /// the column names must be the same.
433    ///
434    /// # Example
435    /// ```
436    /// use std::io::Bytes;
437    /// use std::sync::Arc;
438    /// use tempfile::tempfile;
439    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
440    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
441    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
442    /// use parquet::arrow::ArrowWriter;
443    ///
444    /// // Write data - schema is inferred from the data to be Int32
445    /// let file = tempfile().unwrap();
446    /// let batch = RecordBatch::try_from_iter(vec![
447    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
448    /// ]).unwrap();
449    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
450    /// writer.write(&batch).unwrap();
451    /// writer.close().unwrap();
452    ///
453    /// // Read the file back.
454    /// // Supply a schema that interprets the Int32 column as a Timestamp.
455    /// let supplied_schema = Arc::new(Schema::new(vec![
456    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
457    /// ]));
458    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
459    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
460    ///     file.try_clone().unwrap(),
461    ///     options
462    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
463    ///
464    /// // Create the reader and read the data using the supplied schema.
465    /// let mut reader = builder.build().unwrap();
466    /// let _batch = reader.next().unwrap().unwrap();
467    /// ```
468    pub fn with_schema(self, schema: SchemaRef) -> Self {
469        Self {
470            supplied_schema: Some(schema),
471            skip_arrow_metadata: true,
472            ..self
473        }
474    }
475
476    /// Enable reading [`PageIndex`], if present (defaults to `false`)
477    ///
478    /// The `PageIndex` can be used to push down predicates to the parquet scan,
479    /// potentially eliminating unnecessary IO, by some query engines.
480    ///
481    /// If this is enabled, [`ParquetMetaData::column_index`] and
482    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
483    /// information is present in the file.
484    ///
485    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
486    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
487    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
488    pub fn with_page_index(self, page_index: bool) -> Self {
489        Self { page_index, ..self }
490    }
491
492    /// Provide the file decryption properties to use when reading encrypted parquet files.
493    ///
494    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
495    #[cfg(feature = "encryption")]
496    pub fn with_file_decryption_properties(
497        self,
498        file_decryption_properties: FileDecryptionProperties,
499    ) -> Self {
500        Self {
501            file_decryption_properties: Some(file_decryption_properties),
502            ..self
503        }
504    }
505
506    /// Retrieve the currently set page index behavior.
507    ///
508    /// This can be set via [`with_page_index`][Self::with_page_index].
509    pub fn page_index(&self) -> bool {
510        self.page_index
511    }
512
513    /// Retrieve the currently set file decryption properties.
514    ///
515    /// This can be set via
516    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
517    #[cfg(feature = "encryption")]
518    pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
519        self.file_decryption_properties.as_ref()
520    }
521}
522
523/// The metadata necessary to construct a [`ArrowReaderBuilder`]
524///
525/// Note this structure is cheaply clone-able as it consists of several arcs.
526///
527/// This structure allows
528///
529/// 1. Loading metadata for a file once and then using that same metadata to
530///    construct multiple separate readers, for example, to distribute readers
531///    across multiple threads
532///
533/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
534///    from the file each time a reader is constructed.
535///
536/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
537#[derive(Debug, Clone)]
538pub struct ArrowReaderMetadata {
539    /// The Parquet Metadata, if known aprior
540    pub(crate) metadata: Arc<ParquetMetaData>,
541    /// The Arrow Schema
542    pub(crate) schema: SchemaRef,
543
544    pub(crate) fields: Option<Arc<ParquetField>>,
545}
546
547impl ArrowReaderMetadata {
548    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
549    ///
550    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
551    /// example of how this can be used
552    ///
553    /// # Notes
554    ///
555    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
556    /// `Self::metadata` is missing the page index, this function will attempt
557    /// to load the page index by making an object store request.
558    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
559        let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
560        #[cfg(feature = "encryption")]
561        let metadata =
562            metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
563        let metadata = metadata.parse_and_finish(reader)?;
564        Self::try_new(Arc::new(metadata), options)
565    }
566
567    /// Create a new [`ArrowReaderMetadata`]
568    ///
569    /// # Notes
570    ///
571    /// This function does not attempt to load the PageIndex if not present in the metadata.
572    /// See [`Self::load`] for more details.
573    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
574        match options.supplied_schema {
575            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
576            None => {
577                let kv_metadata = match options.skip_arrow_metadata {
578                    true => None,
579                    false => metadata.file_metadata().key_value_metadata(),
580                };
581
582                let (schema, fields) = parquet_to_arrow_schema_and_fields(
583                    metadata.file_metadata().schema_descr(),
584                    ProjectionMask::all(),
585                    kv_metadata,
586                )?;
587
588                Ok(Self {
589                    metadata,
590                    schema: Arc::new(schema),
591                    fields: fields.map(Arc::new),
592                })
593            }
594        }
595    }
596
597    fn with_supplied_schema(
598        metadata: Arc<ParquetMetaData>,
599        supplied_schema: SchemaRef,
600    ) -> Result<Self> {
601        let parquet_schema = metadata.file_metadata().schema_descr();
602        let field_levels = parquet_to_arrow_field_levels(
603            parquet_schema,
604            ProjectionMask::all(),
605            Some(supplied_schema.fields()),
606        )?;
607        let fields = field_levels.fields;
608        let inferred_len = fields.len();
609        let supplied_len = supplied_schema.fields().len();
610        // Ensure the supplied schema has the same number of columns as the parquet schema.
611        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
612        // different lengths, but we check here to be safe.
613        if inferred_len != supplied_len {
614            return Err(arrow_err!(format!(
615                "Incompatible supplied Arrow schema: expected {} columns received {}",
616                inferred_len, supplied_len
617            )));
618        }
619
620        let mut errors = Vec::new();
621
622        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
623
624        for (field1, field2) in field_iter {
625            if field1.data_type() != field2.data_type() {
626                errors.push(format!(
627                    "data type mismatch for field {}: requested {:?} but found {:?}",
628                    field1.name(),
629                    field1.data_type(),
630                    field2.data_type()
631                ));
632            }
633            if field1.is_nullable() != field2.is_nullable() {
634                errors.push(format!(
635                    "nullability mismatch for field {}: expected {:?} but found {:?}",
636                    field1.name(),
637                    field1.is_nullable(),
638                    field2.is_nullable()
639                ));
640            }
641            if field1.metadata() != field2.metadata() {
642                errors.push(format!(
643                    "metadata mismatch for field {}: expected {:?} but found {:?}",
644                    field1.name(),
645                    field1.metadata(),
646                    field2.metadata()
647                ));
648            }
649        }
650
651        if !errors.is_empty() {
652            let message = errors.join(", ");
653            return Err(ParquetError::ArrowError(format!(
654                "Incompatible supplied Arrow schema: {message}",
655            )));
656        }
657
658        Ok(Self {
659            metadata,
660            schema: supplied_schema,
661            fields: field_levels.levels.map(Arc::new),
662        })
663    }
664
665    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
666    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
667        &self.metadata
668    }
669
670    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
671    pub fn parquet_schema(&self) -> &SchemaDescriptor {
672        self.metadata.file_metadata().schema_descr()
673    }
674
675    /// Returns the arrow [`SchemaRef`] for this parquet file
676    pub fn schema(&self) -> &SchemaRef {
677        &self.schema
678    }
679}
680
681#[doc(hidden)]
682/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
683pub struct SyncReader<T: ChunkReader>(T);
684
685impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
686    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
687        f.debug_tuple("SyncReader").field(&self.0).finish()
688    }
689}
690
691/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
692///
693/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
694///
695/// See [`ArrowReaderBuilder`] for additional member functions
696pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
697
698impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
699    /// Create a new [`ParquetRecordBatchReaderBuilder`]
700    ///
701    /// ```
702    /// # use std::sync::Arc;
703    /// # use bytes::Bytes;
704    /// # use arrow_array::{Int32Array, RecordBatch};
705    /// # use arrow_schema::{DataType, Field, Schema};
706    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
707    /// # use parquet::arrow::ArrowWriter;
708    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
709    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
710    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
711    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
712    /// # writer.write(&batch).unwrap();
713    /// # writer.close().unwrap();
714    /// # let file = Bytes::from(file);
715    /// #
716    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
717    ///
718    /// // Inspect metadata
719    /// assert_eq!(builder.metadata().num_row_groups(), 1);
720    ///
721    /// // Construct reader
722    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
723    ///
724    /// // Read data
725    /// let _batch = reader.next().unwrap().unwrap();
726    /// ```
727    pub fn try_new(reader: T) -> Result<Self> {
728        Self::try_new_with_options(reader, Default::default())
729    }
730
731    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
732    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
733        let metadata = ArrowReaderMetadata::load(&reader, options)?;
734        Ok(Self::new_with_metadata(reader, metadata))
735    }
736
737    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
738    ///
739    /// This interface allows:
740    ///
741    /// 1. Loading metadata once and using it to create multiple builders with
742    ///    potentially different settings or run on different threads
743    ///
744    /// 2. Using a cached copy of the metadata rather than re-reading it from the
745    ///    file each time a reader is constructed.
746    ///
747    /// See the docs on [`ArrowReaderMetadata`] for more details
748    ///
749    /// # Example
750    /// ```
751    /// # use std::fs::metadata;
752    /// # use std::sync::Arc;
753    /// # use bytes::Bytes;
754    /// # use arrow_array::{Int32Array, RecordBatch};
755    /// # use arrow_schema::{DataType, Field, Schema};
756    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
757    /// # use parquet::arrow::ArrowWriter;
758    /// #
759    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
760    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
761    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
762    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
763    /// # writer.write(&batch).unwrap();
764    /// # writer.close().unwrap();
765    /// # let file = Bytes::from(file);
766    /// #
767    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
768    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
769    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
770    ///
771    /// // Should be able to read from both in parallel
772    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
773    /// ```
774    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
775        Self::new_builder(SyncReader(input), metadata)
776    }
777
778    /// Read bloom filter for a column in a row group
779    ///
780    /// Returns `None` if the column does not have a bloom filter
781    ///
782    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
783    pub fn get_row_group_column_bloom_filter(
784        &mut self,
785        row_group_idx: usize,
786        column_idx: usize,
787    ) -> Result<Option<Sbbf>> {
788        let metadata = self.metadata.row_group(row_group_idx);
789        let column_metadata = metadata.column(column_idx);
790
791        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
792            offset
793                .try_into()
794                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
795        } else {
796            return Ok(None);
797        };
798
799        let buffer = match column_metadata.bloom_filter_length() {
800            Some(length) => self.input.0.get_bytes(offset, length as usize),
801            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
802        }?;
803
804        let (header, bitset_offset) =
805            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
806
807        match header.algorithm {
808            BloomFilterAlgorithm::BLOCK(_) => {
809                // this match exists to future proof the singleton algorithm enum
810            }
811        }
812        match header.compression {
813            BloomFilterCompression::UNCOMPRESSED(_) => {
814                // this match exists to future proof the singleton compression enum
815            }
816        }
817        match header.hash {
818            BloomFilterHash::XXHASH(_) => {
819                // this match exists to future proof the singleton hash enum
820            }
821        }
822
823        let bitset = match column_metadata.bloom_filter_length() {
824            Some(_) => buffer.slice(
825                (TryInto::<usize>::try_into(bitset_offset).unwrap()
826                    - TryInto::<usize>::try_into(offset).unwrap())..,
827            ),
828            None => {
829                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
830                    ParquetError::General("Bloom filter length is invalid".to_string())
831                })?;
832                self.input.0.get_bytes(bitset_offset, bitset_length)?
833            }
834        };
835        Ok(Some(Sbbf::new(&bitset)))
836    }
837
838    /// Build a [`ParquetRecordBatchReader`]
839    ///
840    /// Note: this will eagerly evaluate any `RowFilter` before returning
841    pub fn build(self) -> Result<ParquetRecordBatchReader> {
842        let Self {
843            input,
844            metadata,
845            schema: _,
846            fields,
847            batch_size: _,
848            row_groups,
849            projection,
850            mut filter,
851            selection,
852            limit,
853            offset,
854            metrics,
855            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
856            max_predicate_cache_size: _,
857        } = self;
858
859        // Try to avoid allocate large buffer
860        let batch_size = self
861            .batch_size
862            .min(metadata.file_metadata().num_rows() as usize);
863
864        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
865
866        let reader = ReaderRowGroups {
867            reader: Arc::new(input.0),
868            metadata,
869            row_groups,
870        };
871
872        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
873
874        // Update selection based on any filters
875        if let Some(filter) = filter.as_mut() {
876            for predicate in filter.predicates.iter_mut() {
877                // break early if we have ruled out all rows
878                if !plan_builder.selects_any() {
879                    break;
880                }
881
882                let mut cache_projection = predicate.projection().clone();
883                cache_projection.intersect(&projection);
884
885                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
886                    .build_array_reader(fields.as_deref(), predicate.projection())?;
887
888                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
889            }
890        }
891
892        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
893            .build_array_reader(fields.as_deref(), &projection)?;
894
895        let read_plan = plan_builder
896            .limited(reader.num_rows())
897            .with_offset(offset)
898            .with_limit(limit)
899            .build_limited()
900            .build();
901
902        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
903    }
904}
905
906struct ReaderRowGroups<T: ChunkReader> {
907    reader: Arc<T>,
908
909    metadata: Arc<ParquetMetaData>,
910    /// Optional list of row group indices to scan
911    row_groups: Vec<usize>,
912}
913
914impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
915    fn num_rows(&self) -> usize {
916        let meta = self.metadata.row_groups();
917        self.row_groups
918            .iter()
919            .map(|x| meta[*x].num_rows() as usize)
920            .sum()
921    }
922
923    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
924        Ok(Box::new(ReaderPageIterator {
925            column_idx: i,
926            reader: self.reader.clone(),
927            metadata: self.metadata.clone(),
928            row_groups: self.row_groups.clone().into_iter(),
929        }))
930    }
931}
932
933struct ReaderPageIterator<T: ChunkReader> {
934    reader: Arc<T>,
935    column_idx: usize,
936    row_groups: std::vec::IntoIter<usize>,
937    metadata: Arc<ParquetMetaData>,
938}
939
940impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
941    /// Return the next SerializedPageReader
942    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
943        let rg = self.metadata.row_group(rg_idx);
944        let column_chunk_metadata = rg.column(self.column_idx);
945        let offset_index = self.metadata.offset_index();
946        // `offset_index` may not exist and `i[rg_idx]` will be empty.
947        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
948        let page_locations = offset_index
949            .filter(|i| !i[rg_idx].is_empty())
950            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
951        let total_rows = rg.num_rows() as usize;
952        let reader = self.reader.clone();
953
954        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
955            .add_crypto_context(
956                rg_idx,
957                self.column_idx,
958                self.metadata.as_ref(),
959                column_chunk_metadata,
960            )
961    }
962}
963
964impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
965    type Item = Result<Box<dyn PageReader>>;
966
967    fn next(&mut self) -> Option<Self::Item> {
968        let rg_idx = self.row_groups.next()?;
969        let page_reader = self
970            .next_page_reader(rg_idx)
971            .map(|page_reader| Box::new(page_reader) as _);
972        Some(page_reader)
973    }
974}
975
976impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
977
978/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
979/// read from a parquet data source
980pub struct ParquetRecordBatchReader {
981    array_reader: Box<dyn ArrayReader>,
982    schema: SchemaRef,
983    read_plan: ReadPlan,
984}
985
986impl Iterator for ParquetRecordBatchReader {
987    type Item = Result<RecordBatch, ArrowError>;
988
989    fn next(&mut self) -> Option<Self::Item> {
990        self.next_inner()
991            .map_err(|arrow_err| arrow_err.into())
992            .transpose()
993    }
994}
995
996impl ParquetRecordBatchReader {
997    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
998    /// has reached the end of the file.
999    ///
1000    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1001    /// simplify error handling with `?`
1002    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1003        let mut read_records = 0;
1004        let batch_size = self.batch_size();
1005        match self.read_plan.selection_mut() {
1006            Some(selection) => {
1007                while read_records < batch_size && !selection.is_empty() {
1008                    let front = selection.pop_front().unwrap();
1009                    if front.skip {
1010                        let skipped = self.array_reader.skip_records(front.row_count)?;
1011
1012                        if skipped != front.row_count {
1013                            return Err(general_err!(
1014                                "failed to skip rows, expected {}, got {}",
1015                                front.row_count,
1016                                skipped
1017                            ));
1018                        }
1019                        continue;
1020                    }
1021
1022                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1023                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1024                    if front.row_count == 0 {
1025                        continue;
1026                    }
1027
1028                    // try to read record
1029                    let need_read = batch_size - read_records;
1030                    let to_read = match front.row_count.checked_sub(need_read) {
1031                        Some(remaining) if remaining != 0 => {
1032                            // if page row count less than batch_size we must set batch size to page row count.
1033                            // add check avoid dead loop
1034                            selection.push_front(RowSelector::select(remaining));
1035                            need_read
1036                        }
1037                        _ => front.row_count,
1038                    };
1039                    match self.array_reader.read_records(to_read)? {
1040                        0 => break,
1041                        rec => read_records += rec,
1042                    };
1043                }
1044            }
1045            None => {
1046                self.array_reader.read_records(batch_size)?;
1047            }
1048        };
1049
1050        let array = self.array_reader.consume_batch()?;
1051        let struct_array = array.as_struct_opt().ok_or_else(|| {
1052            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1053        })?;
1054
1055        Ok(if struct_array.len() > 0 {
1056            Some(RecordBatch::from(struct_array))
1057        } else {
1058            None
1059        })
1060    }
1061}
1062
1063impl RecordBatchReader for ParquetRecordBatchReader {
1064    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1065    ///
1066    /// Note that the schema metadata will be stripped here. See
1067    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1068    fn schema(&self) -> SchemaRef {
1069        self.schema.clone()
1070    }
1071}
1072
1073impl ParquetRecordBatchReader {
1074    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1075    ///
1076    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1077    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1078        ParquetRecordBatchReaderBuilder::try_new(reader)?
1079            .with_batch_size(batch_size)
1080            .build()
1081    }
1082
1083    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1084    ///
1085    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1086    /// higher-level interface for reading parquet data from a file
1087    pub fn try_new_with_row_groups(
1088        levels: &FieldLevels,
1089        row_groups: &dyn RowGroups,
1090        batch_size: usize,
1091        selection: Option<RowSelection>,
1092    ) -> Result<Self> {
1093        // note metrics are not supported in this API
1094        let metrics = ArrowReaderMetrics::disabled();
1095        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1096            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1097
1098        let read_plan = ReadPlanBuilder::new(batch_size)
1099            .with_selection(selection)
1100            .build();
1101
1102        Ok(Self {
1103            array_reader,
1104            schema: Arc::new(Schema::new(levels.fields.clone())),
1105            read_plan,
1106        })
1107    }
1108
1109    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1110    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1111    /// all rows will be returned
1112    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1113        let schema = match array_reader.get_data_type() {
1114            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
1115            _ => unreachable!("Struct array reader's data type is not struct!"),
1116        };
1117
1118        Self {
1119            array_reader,
1120            schema: Arc::new(schema),
1121            read_plan,
1122        }
1123    }
1124
1125    #[inline(always)]
1126    pub(crate) fn batch_size(&self) -> usize {
1127        self.read_plan.batch_size()
1128    }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133    use std::cmp::min;
1134    use std::collections::{HashMap, VecDeque};
1135    use std::fmt::Formatter;
1136    use std::fs::File;
1137    use std::io::Seek;
1138    use std::path::PathBuf;
1139    use std::sync::Arc;
1140
1141    use arrow_array::builder::*;
1142    use arrow_array::cast::AsArray;
1143    use arrow_array::types::{
1144        Date32Type, Date64Type, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
1145        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1146        Time64MicrosecondType,
1147    };
1148    use arrow_array::*;
1149    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1150    use arrow_data::{ArrayData, ArrayDataBuilder};
1151    use arrow_schema::{
1152        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1153    };
1154    use arrow_select::concat::concat_batches;
1155    use bytes::Bytes;
1156    use half::f16;
1157    use num::PrimInt;
1158    use rand::{rng, Rng, RngCore};
1159    use tempfile::tempfile;
1160
1161    use crate::arrow::arrow_reader::{
1162        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1163        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1164    };
1165    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1166    use crate::arrow::{ArrowWriter, ProjectionMask};
1167    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1168    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1169    use crate::data_type::{
1170        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1171        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1172    };
1173    use crate::errors::Result;
1174    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1175    use crate::file::writer::SerializedFileWriter;
1176    use crate::schema::parser::parse_message_type;
1177    use crate::schema::types::{Type, TypePtr};
1178    use crate::util::test_common::rand_gen::RandGen;
1179
1180    #[test]
1181    fn test_arrow_reader_all_columns() {
1182        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1183
1184        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1185        let original_schema = Arc::clone(builder.schema());
1186        let reader = builder.build().unwrap();
1187
1188        // Verify that the schema was correctly parsed
1189        assert_eq!(original_schema.fields(), reader.schema().fields());
1190    }
1191
1192    #[test]
1193    fn test_arrow_reader_single_column() {
1194        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1195
1196        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1197        let original_schema = Arc::clone(builder.schema());
1198
1199        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1200        let reader = builder.with_projection(mask).build().unwrap();
1201
1202        // Verify that the schema was correctly parsed
1203        assert_eq!(1, reader.schema().fields().len());
1204        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1205    }
1206
1207    #[test]
1208    fn test_arrow_reader_single_column_by_name() {
1209        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1210
1211        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1212        let original_schema = Arc::clone(builder.schema());
1213
1214        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1215        let reader = builder.with_projection(mask).build().unwrap();
1216
1217        // Verify that the schema was correctly parsed
1218        assert_eq!(1, reader.schema().fields().len());
1219        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1220    }
1221
1222    #[test]
1223    fn test_null_column_reader_test() {
1224        let mut file = tempfile::tempfile().unwrap();
1225
1226        let schema = "
1227            message message {
1228                OPTIONAL INT32 int32;
1229            }
1230        ";
1231        let schema = Arc::new(parse_message_type(schema).unwrap());
1232
1233        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1234        generate_single_column_file_with_data::<Int32Type>(
1235            &[vec![], vec![]],
1236            Some(&def_levels),
1237            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1238            schema,
1239            Some(Field::new("int32", ArrowDataType::Null, true)),
1240            &Default::default(),
1241        )
1242        .unwrap();
1243
1244        file.rewind().unwrap();
1245
1246        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1247        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1248
1249        assert_eq!(batches.len(), 4);
1250        for batch in &batches[0..3] {
1251            assert_eq!(batch.num_rows(), 2);
1252            assert_eq!(batch.num_columns(), 1);
1253            assert_eq!(batch.column(0).null_count(), 2);
1254        }
1255
1256        assert_eq!(batches[3].num_rows(), 1);
1257        assert_eq!(batches[3].num_columns(), 1);
1258        assert_eq!(batches[3].column(0).null_count(), 1);
1259    }
1260
1261    #[test]
1262    fn test_primitive_single_column_reader_test() {
1263        run_single_column_reader_tests::<BoolType, _, BoolType>(
1264            2,
1265            ConvertedType::NONE,
1266            None,
1267            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1268            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1269        );
1270        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1271            2,
1272            ConvertedType::NONE,
1273            None,
1274            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1275            &[
1276                Encoding::PLAIN,
1277                Encoding::RLE_DICTIONARY,
1278                Encoding::DELTA_BINARY_PACKED,
1279                Encoding::BYTE_STREAM_SPLIT,
1280            ],
1281        );
1282        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1283            2,
1284            ConvertedType::NONE,
1285            None,
1286            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1287            &[
1288                Encoding::PLAIN,
1289                Encoding::RLE_DICTIONARY,
1290                Encoding::DELTA_BINARY_PACKED,
1291                Encoding::BYTE_STREAM_SPLIT,
1292            ],
1293        );
1294        run_single_column_reader_tests::<FloatType, _, FloatType>(
1295            2,
1296            ConvertedType::NONE,
1297            None,
1298            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1299            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1300        );
1301    }
1302
1303    #[test]
1304    fn test_unsigned_primitive_single_column_reader_test() {
1305        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1306            2,
1307            ConvertedType::UINT_32,
1308            Some(ArrowDataType::UInt32),
1309            |vals| {
1310                Arc::new(UInt32Array::from_iter(
1311                    vals.iter().map(|x| x.map(|x| x as u32)),
1312                ))
1313            },
1314            &[
1315                Encoding::PLAIN,
1316                Encoding::RLE_DICTIONARY,
1317                Encoding::DELTA_BINARY_PACKED,
1318            ],
1319        );
1320        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1321            2,
1322            ConvertedType::UINT_64,
1323            Some(ArrowDataType::UInt64),
1324            |vals| {
1325                Arc::new(UInt64Array::from_iter(
1326                    vals.iter().map(|x| x.map(|x| x as u64)),
1327                ))
1328            },
1329            &[
1330                Encoding::PLAIN,
1331                Encoding::RLE_DICTIONARY,
1332                Encoding::DELTA_BINARY_PACKED,
1333            ],
1334        );
1335    }
1336
1337    #[test]
1338    fn test_unsigned_roundtrip() {
1339        let schema = Arc::new(Schema::new(vec![
1340            Field::new("uint32", ArrowDataType::UInt32, true),
1341            Field::new("uint64", ArrowDataType::UInt64, true),
1342        ]));
1343
1344        let mut buf = Vec::with_capacity(1024);
1345        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1346
1347        let original = RecordBatch::try_new(
1348            schema,
1349            vec![
1350                Arc::new(UInt32Array::from_iter_values([
1351                    0,
1352                    i32::MAX as u32,
1353                    u32::MAX,
1354                ])),
1355                Arc::new(UInt64Array::from_iter_values([
1356                    0,
1357                    i64::MAX as u64,
1358                    u64::MAX,
1359                ])),
1360            ],
1361        )
1362        .unwrap();
1363
1364        writer.write(&original).unwrap();
1365        writer.close().unwrap();
1366
1367        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1368        let ret = reader.next().unwrap().unwrap();
1369        assert_eq!(ret, original);
1370
1371        // Check they can be downcast to the correct type
1372        ret.column(0)
1373            .as_any()
1374            .downcast_ref::<UInt32Array>()
1375            .unwrap();
1376
1377        ret.column(1)
1378            .as_any()
1379            .downcast_ref::<UInt64Array>()
1380            .unwrap();
1381    }
1382
1383    #[test]
1384    fn test_float16_roundtrip() -> Result<()> {
1385        let schema = Arc::new(Schema::new(vec![
1386            Field::new("float16", ArrowDataType::Float16, false),
1387            Field::new("float16-nullable", ArrowDataType::Float16, true),
1388        ]));
1389
1390        let mut buf = Vec::with_capacity(1024);
1391        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1392
1393        let original = RecordBatch::try_new(
1394            schema,
1395            vec![
1396                Arc::new(Float16Array::from_iter_values([
1397                    f16::EPSILON,
1398                    f16::MIN,
1399                    f16::MAX,
1400                    f16::NAN,
1401                    f16::INFINITY,
1402                    f16::NEG_INFINITY,
1403                    f16::ONE,
1404                    f16::NEG_ONE,
1405                    f16::ZERO,
1406                    f16::NEG_ZERO,
1407                    f16::E,
1408                    f16::PI,
1409                    f16::FRAC_1_PI,
1410                ])),
1411                Arc::new(Float16Array::from(vec![
1412                    None,
1413                    None,
1414                    None,
1415                    Some(f16::NAN),
1416                    Some(f16::INFINITY),
1417                    Some(f16::NEG_INFINITY),
1418                    None,
1419                    None,
1420                    None,
1421                    None,
1422                    None,
1423                    None,
1424                    Some(f16::FRAC_1_PI),
1425                ])),
1426            ],
1427        )?;
1428
1429        writer.write(&original)?;
1430        writer.close()?;
1431
1432        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1433        let ret = reader.next().unwrap()?;
1434        assert_eq!(ret, original);
1435
1436        // Ensure can be downcast to the correct type
1437        ret.column(0).as_primitive::<Float16Type>();
1438        ret.column(1).as_primitive::<Float16Type>();
1439
1440        Ok(())
1441    }
1442
1443    #[test]
1444    fn test_time_utc_roundtrip() -> Result<()> {
1445        let schema = Arc::new(Schema::new(vec![
1446            Field::new(
1447                "time_millis",
1448                ArrowDataType::Time32(TimeUnit::Millisecond),
1449                true,
1450            )
1451            .with_metadata(HashMap::from_iter(vec![(
1452                "adjusted_to_utc".to_string(),
1453                "".to_string(),
1454            )])),
1455            Field::new(
1456                "time_micros",
1457                ArrowDataType::Time64(TimeUnit::Microsecond),
1458                true,
1459            )
1460            .with_metadata(HashMap::from_iter(vec![(
1461                "adjusted_to_utc".to_string(),
1462                "".to_string(),
1463            )])),
1464        ]));
1465
1466        let mut buf = Vec::with_capacity(1024);
1467        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1468
1469        let original = RecordBatch::try_new(
1470            schema,
1471            vec![
1472                Arc::new(Time32MillisecondArray::from(vec![
1473                    Some(-1),
1474                    Some(0),
1475                    Some(86_399_000),
1476                    Some(86_400_000),
1477                    Some(86_401_000),
1478                    None,
1479                ])),
1480                Arc::new(Time64MicrosecondArray::from(vec![
1481                    Some(-1),
1482                    Some(0),
1483                    Some(86_399 * 1_000_000),
1484                    Some(86_400 * 1_000_000),
1485                    Some(86_401 * 1_000_000),
1486                    None,
1487                ])),
1488            ],
1489        )?;
1490
1491        writer.write(&original)?;
1492        writer.close()?;
1493
1494        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1495        let ret = reader.next().unwrap()?;
1496        assert_eq!(ret, original);
1497
1498        // Ensure can be downcast to the correct type
1499        ret.column(0).as_primitive::<Time32MillisecondType>();
1500        ret.column(1).as_primitive::<Time64MicrosecondType>();
1501
1502        Ok(())
1503    }
1504
1505    #[test]
1506    fn test_date32_roundtrip() -> Result<()> {
1507        use arrow_array::Date32Array;
1508
1509        let schema = Arc::new(Schema::new(vec![Field::new(
1510            "date32",
1511            ArrowDataType::Date32,
1512            false,
1513        )]));
1514
1515        let mut buf = Vec::with_capacity(1024);
1516
1517        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1518
1519        let original = RecordBatch::try_new(
1520            schema,
1521            vec![Arc::new(Date32Array::from(vec![
1522                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1523            ]))],
1524        )?;
1525
1526        writer.write(&original)?;
1527        writer.close()?;
1528
1529        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1530        let ret = reader.next().unwrap()?;
1531        assert_eq!(ret, original);
1532
1533        // Ensure can be downcast to the correct type
1534        ret.column(0).as_primitive::<Date32Type>();
1535
1536        Ok(())
1537    }
1538
1539    #[test]
1540    fn test_date64_roundtrip() -> Result<()> {
1541        use arrow_array::Date64Array;
1542
1543        let schema = Arc::new(Schema::new(vec![
1544            Field::new("small-date64", ArrowDataType::Date64, false),
1545            Field::new("big-date64", ArrowDataType::Date64, false),
1546            Field::new("invalid-date64", ArrowDataType::Date64, false),
1547        ]));
1548
1549        let mut default_buf = Vec::with_capacity(1024);
1550        let mut coerce_buf = Vec::with_capacity(1024);
1551
1552        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1553
1554        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1555        let mut coerce_writer =
1556            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1557
1558        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1559
1560        let original = RecordBatch::try_new(
1561            schema,
1562            vec![
1563                // small-date64
1564                Arc::new(Date64Array::from(vec![
1565                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1566                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1567                    0,
1568                    1_000 * NUM_MILLISECONDS_IN_DAY,
1569                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1570                ])),
1571                // big-date64
1572                Arc::new(Date64Array::from(vec![
1573                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1574                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1575                    0,
1576                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1577                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1578                ])),
1579                // invalid-date64
1580                Arc::new(Date64Array::from(vec![
1581                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1582                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1583                    1,
1584                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1585                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1586                ])),
1587            ],
1588        )?;
1589
1590        default_writer.write(&original)?;
1591        coerce_writer.write(&original)?;
1592
1593        default_writer.close()?;
1594        coerce_writer.close()?;
1595
1596        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1597        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1598
1599        let default_ret = default_reader.next().unwrap()?;
1600        let coerce_ret = coerce_reader.next().unwrap()?;
1601
1602        // Roundtrip should be successful when default writer used
1603        assert_eq!(default_ret, original);
1604
1605        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1606        assert_eq!(coerce_ret.column(0), original.column(0));
1607        assert_ne!(coerce_ret.column(1), original.column(1));
1608        assert_ne!(coerce_ret.column(2), original.column(2));
1609
1610        // Ensure both can be downcast to the correct type
1611        default_ret.column(0).as_primitive::<Date64Type>();
1612        coerce_ret.column(0).as_primitive::<Date64Type>();
1613
1614        Ok(())
1615    }
1616    struct RandFixedLenGen {}
1617
1618    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1619        fn gen(len: i32) -> FixedLenByteArray {
1620            let mut v = vec![0u8; len as usize];
1621            rng().fill_bytes(&mut v);
1622            ByteArray::from(v).into()
1623        }
1624    }
1625
1626    #[test]
1627    fn test_fixed_length_binary_column_reader() {
1628        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1629            20,
1630            ConvertedType::NONE,
1631            None,
1632            |vals| {
1633                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1634                for val in vals {
1635                    match val {
1636                        Some(b) => builder.append_value(b).unwrap(),
1637                        None => builder.append_null(),
1638                    }
1639                }
1640                Arc::new(builder.finish())
1641            },
1642            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1643        );
1644    }
1645
1646    #[test]
1647    fn test_interval_day_time_column_reader() {
1648        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1649            12,
1650            ConvertedType::INTERVAL,
1651            None,
1652            |vals| {
1653                Arc::new(
1654                    vals.iter()
1655                        .map(|x| {
1656                            x.as_ref().map(|b| IntervalDayTime {
1657                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1658                                milliseconds: i32::from_le_bytes(
1659                                    b.as_ref()[8..12].try_into().unwrap(),
1660                                ),
1661                            })
1662                        })
1663                        .collect::<IntervalDayTimeArray>(),
1664                )
1665            },
1666            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1667        );
1668    }
1669
1670    #[test]
1671    fn test_int96_single_column_reader_test() {
1672        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1673
1674        type TypeHintAndConversionFunction =
1675            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1676
1677        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1678            // Test without a specified ArrowType hint.
1679            (None, |vals: &[Option<Int96>]| {
1680                Arc::new(TimestampNanosecondArray::from_iter(
1681                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1682                )) as ArrayRef
1683            }),
1684            // Test other TimeUnits as ArrowType hints.
1685            (
1686                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1687                |vals: &[Option<Int96>]| {
1688                    Arc::new(TimestampSecondArray::from_iter(
1689                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1690                    )) as ArrayRef
1691                },
1692            ),
1693            (
1694                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1695                |vals: &[Option<Int96>]| {
1696                    Arc::new(TimestampMillisecondArray::from_iter(
1697                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1698                    )) as ArrayRef
1699                },
1700            ),
1701            (
1702                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1703                |vals: &[Option<Int96>]| {
1704                    Arc::new(TimestampMicrosecondArray::from_iter(
1705                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1706                    )) as ArrayRef
1707                },
1708            ),
1709            (
1710                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1711                |vals: &[Option<Int96>]| {
1712                    Arc::new(TimestampNanosecondArray::from_iter(
1713                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1714                    )) as ArrayRef
1715                },
1716            ),
1717            // Test another timezone with TimeUnit as ArrowType hints.
1718            (
1719                Some(ArrowDataType::Timestamp(
1720                    TimeUnit::Second,
1721                    Some(Arc::from("-05:00")),
1722                )),
1723                |vals: &[Option<Int96>]| {
1724                    Arc::new(
1725                        TimestampSecondArray::from_iter(
1726                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1727                        )
1728                        .with_timezone("-05:00"),
1729                    ) as ArrayRef
1730                },
1731            ),
1732        ];
1733
1734        resolutions.iter().for_each(|(arrow_type, converter)| {
1735            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1736                2,
1737                ConvertedType::NONE,
1738                arrow_type.clone(),
1739                converter,
1740                encodings,
1741            );
1742        })
1743    }
1744
1745    #[test]
1746    fn test_int96_from_spark_file_with_provided_schema() {
1747        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1748        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1749        // microsecond resolution for the Parquet reader to interpret these values correctly.
1750        use arrow_schema::DataType::Timestamp;
1751        let test_data = arrow::util::test_util::parquet_test_data();
1752        let path = format!("{test_data}/int96_from_spark.parquet");
1753        let file = File::open(path).unwrap();
1754
1755        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1756            "a",
1757            Timestamp(TimeUnit::Microsecond, None),
1758            true,
1759        )]));
1760        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1761
1762        let mut record_reader =
1763            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1764                .unwrap()
1765                .build()
1766                .unwrap();
1767
1768        let batch = record_reader.next().unwrap().unwrap();
1769        assert_eq!(batch.num_columns(), 1);
1770        let column = batch.column(0);
1771        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1772
1773        let expected = Arc::new(Int64Array::from(vec![
1774            Some(1704141296123456),
1775            Some(1704070800000000),
1776            Some(253402225200000000),
1777            Some(1735599600000000),
1778            None,
1779            Some(9089380393200000000),
1780        ]));
1781
1782        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1783        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1784        // anyway, so this should be a zero-copy non-modifying cast.
1785
1786        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1787        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1788
1789        assert_eq!(casted_timestamps.len(), expected.len());
1790
1791        casted_timestamps
1792            .iter()
1793            .zip(expected.iter())
1794            .for_each(|(lhs, rhs)| {
1795                assert_eq!(lhs, rhs);
1796            });
1797    }
1798
1799    #[test]
1800    fn test_int96_from_spark_file_without_provided_schema() {
1801        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1802        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1803        // values when read as nanosecond resolution overflow and result in garbage values.
1804        use arrow_schema::DataType::Timestamp;
1805        let test_data = arrow::util::test_util::parquet_test_data();
1806        let path = format!("{test_data}/int96_from_spark.parquet");
1807        let file = File::open(path).unwrap();
1808
1809        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1810            .unwrap()
1811            .build()
1812            .unwrap();
1813
1814        let batch = record_reader.next().unwrap().unwrap();
1815        assert_eq!(batch.num_columns(), 1);
1816        let column = batch.column(0);
1817        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1818
1819        let expected = Arc::new(Int64Array::from(vec![
1820            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1821            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1822            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1823            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1824            None,
1825            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1826        ]));
1827
1828        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1829        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1830        // anyway, so this should be a zero-copy non-modifying cast.
1831
1832        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1833        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1834
1835        assert_eq!(casted_timestamps.len(), expected.len());
1836
1837        casted_timestamps
1838            .iter()
1839            .zip(expected.iter())
1840            .for_each(|(lhs, rhs)| {
1841                assert_eq!(lhs, rhs);
1842            });
1843    }
1844
1845    struct RandUtf8Gen {}
1846
1847    impl RandGen<ByteArrayType> for RandUtf8Gen {
1848        fn gen(len: i32) -> ByteArray {
1849            Int32Type::gen(len).to_string().as_str().into()
1850        }
1851    }
1852
1853    #[test]
1854    fn test_utf8_single_column_reader_test() {
1855        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1856            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1857                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1858            })))
1859        }
1860
1861        let encodings = &[
1862            Encoding::PLAIN,
1863            Encoding::RLE_DICTIONARY,
1864            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1865            Encoding::DELTA_BYTE_ARRAY,
1866        ];
1867
1868        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1869            2,
1870            ConvertedType::NONE,
1871            None,
1872            |vals| {
1873                Arc::new(BinaryArray::from_iter(
1874                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1875                ))
1876            },
1877            encodings,
1878        );
1879
1880        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1881            2,
1882            ConvertedType::UTF8,
1883            None,
1884            string_converter::<i32>,
1885            encodings,
1886        );
1887
1888        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1889            2,
1890            ConvertedType::UTF8,
1891            Some(ArrowDataType::Utf8),
1892            string_converter::<i32>,
1893            encodings,
1894        );
1895
1896        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1897            2,
1898            ConvertedType::UTF8,
1899            Some(ArrowDataType::LargeUtf8),
1900            string_converter::<i64>,
1901            encodings,
1902        );
1903
1904        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1905        for key in &small_key_types {
1906            for encoding in encodings {
1907                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1908                opts.encoding = *encoding;
1909
1910                let data_type =
1911                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1912
1913                // Cannot run full test suite as keys overflow, run small test instead
1914                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1915                    opts,
1916                    2,
1917                    ConvertedType::UTF8,
1918                    Some(data_type.clone()),
1919                    move |vals| {
1920                        let vals = string_converter::<i32>(vals);
1921                        arrow::compute::cast(&vals, &data_type).unwrap()
1922                    },
1923                );
1924            }
1925        }
1926
1927        let key_types = [
1928            ArrowDataType::Int16,
1929            ArrowDataType::UInt16,
1930            ArrowDataType::Int32,
1931            ArrowDataType::UInt32,
1932            ArrowDataType::Int64,
1933            ArrowDataType::UInt64,
1934        ];
1935
1936        for key in &key_types {
1937            let data_type =
1938                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1939
1940            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1941                2,
1942                ConvertedType::UTF8,
1943                Some(data_type.clone()),
1944                move |vals| {
1945                    let vals = string_converter::<i32>(vals);
1946                    arrow::compute::cast(&vals, &data_type).unwrap()
1947                },
1948                encodings,
1949            );
1950
1951            // https://github.com/apache/arrow-rs/issues/1179
1952            // let data_type = ArrowDataType::Dictionary(
1953            //     Box::new(key.clone()),
1954            //     Box::new(ArrowDataType::LargeUtf8),
1955            // );
1956            //
1957            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1958            //     2,
1959            //     ConvertedType::UTF8,
1960            //     Some(data_type.clone()),
1961            //     move |vals| {
1962            //         let vals = string_converter::<i64>(vals);
1963            //         arrow::compute::cast(&vals, &data_type).unwrap()
1964            //     },
1965            //     encodings,
1966            // );
1967        }
1968    }
1969
1970    #[test]
1971    fn test_decimal_nullable_struct() {
1972        let decimals = Decimal256Array::from_iter_values(
1973            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1974        );
1975
1976        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1977            "decimals",
1978            decimals.data_type().clone(),
1979            false,
1980        )])))
1981        .len(8)
1982        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1983        .child_data(vec![decimals.into_data()])
1984        .build()
1985        .unwrap();
1986
1987        let written =
1988            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1989                .unwrap();
1990
1991        let mut buffer = Vec::with_capacity(1024);
1992        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1993        writer.write(&written).unwrap();
1994        writer.close().unwrap();
1995
1996        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1997            .unwrap()
1998            .collect::<Result<Vec<_>, _>>()
1999            .unwrap();
2000
2001        assert_eq!(&written.slice(0, 3), &read[0]);
2002        assert_eq!(&written.slice(3, 3), &read[1]);
2003        assert_eq!(&written.slice(6, 2), &read[2]);
2004    }
2005
2006    #[test]
2007    fn test_int32_nullable_struct() {
2008        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2009        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2010            "int32",
2011            int32.data_type().clone(),
2012            false,
2013        )])))
2014        .len(8)
2015        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2016        .child_data(vec![int32.into_data()])
2017        .build()
2018        .unwrap();
2019
2020        let written =
2021            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2022                .unwrap();
2023
2024        let mut buffer = Vec::with_capacity(1024);
2025        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2026        writer.write(&written).unwrap();
2027        writer.close().unwrap();
2028
2029        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2030            .unwrap()
2031            .collect::<Result<Vec<_>, _>>()
2032            .unwrap();
2033
2034        assert_eq!(&written.slice(0, 3), &read[0]);
2035        assert_eq!(&written.slice(3, 3), &read[1]);
2036        assert_eq!(&written.slice(6, 2), &read[2]);
2037    }
2038
2039    #[test]
2040    fn test_decimal_list() {
2041        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2042
2043        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2044        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2045            decimals.data_type().clone(),
2046            false,
2047        ))))
2048        .len(7)
2049        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2050        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2051        .child_data(vec![decimals.into_data()])
2052        .build()
2053        .unwrap();
2054
2055        let written =
2056            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2057                .unwrap();
2058
2059        let mut buffer = Vec::with_capacity(1024);
2060        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2061        writer.write(&written).unwrap();
2062        writer.close().unwrap();
2063
2064        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2065            .unwrap()
2066            .collect::<Result<Vec<_>, _>>()
2067            .unwrap();
2068
2069        assert_eq!(&written.slice(0, 3), &read[0]);
2070        assert_eq!(&written.slice(3, 3), &read[1]);
2071        assert_eq!(&written.slice(6, 1), &read[2]);
2072    }
2073
2074    #[test]
2075    fn test_read_decimal_file() {
2076        use arrow_array::Decimal128Array;
2077        let testdata = arrow::util::test_util::parquet_test_data();
2078        let file_variants = vec![
2079            ("byte_array", 4),
2080            ("fixed_length", 25),
2081            ("int32", 4),
2082            ("int64", 10),
2083        ];
2084        for (prefix, target_precision) in file_variants {
2085            let path = format!("{testdata}/{prefix}_decimal.parquet");
2086            let file = File::open(path).unwrap();
2087            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2088
2089            let batch = record_reader.next().unwrap().unwrap();
2090            assert_eq!(batch.num_rows(), 24);
2091            let col = batch
2092                .column(0)
2093                .as_any()
2094                .downcast_ref::<Decimal128Array>()
2095                .unwrap();
2096
2097            let expected = 1..25;
2098
2099            assert_eq!(col.precision(), target_precision);
2100            assert_eq!(col.scale(), 2);
2101
2102            for (i, v) in expected.enumerate() {
2103                assert_eq!(col.value(i), v * 100_i128);
2104            }
2105        }
2106    }
2107
2108    #[test]
2109    fn test_read_float16_nonzeros_file() {
2110        use arrow_array::Float16Array;
2111        let testdata = arrow::util::test_util::parquet_test_data();
2112        // see https://github.com/apache/parquet-testing/pull/40
2113        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2114        let file = File::open(path).unwrap();
2115        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2116
2117        let batch = record_reader.next().unwrap().unwrap();
2118        assert_eq!(batch.num_rows(), 8);
2119        let col = batch
2120            .column(0)
2121            .as_any()
2122            .downcast_ref::<Float16Array>()
2123            .unwrap();
2124
2125        let f16_two = f16::ONE + f16::ONE;
2126
2127        assert_eq!(col.null_count(), 1);
2128        assert!(col.is_null(0));
2129        assert_eq!(col.value(1), f16::ONE);
2130        assert_eq!(col.value(2), -f16_two);
2131        assert!(col.value(3).is_nan());
2132        assert_eq!(col.value(4), f16::ZERO);
2133        assert!(col.value(4).is_sign_positive());
2134        assert_eq!(col.value(5), f16::NEG_ONE);
2135        assert_eq!(col.value(6), f16::NEG_ZERO);
2136        assert!(col.value(6).is_sign_negative());
2137        assert_eq!(col.value(7), f16_two);
2138    }
2139
2140    #[test]
2141    fn test_read_float16_zeros_file() {
2142        use arrow_array::Float16Array;
2143        let testdata = arrow::util::test_util::parquet_test_data();
2144        // see https://github.com/apache/parquet-testing/pull/40
2145        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2146        let file = File::open(path).unwrap();
2147        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2148
2149        let batch = record_reader.next().unwrap().unwrap();
2150        assert_eq!(batch.num_rows(), 3);
2151        let col = batch
2152            .column(0)
2153            .as_any()
2154            .downcast_ref::<Float16Array>()
2155            .unwrap();
2156
2157        assert_eq!(col.null_count(), 1);
2158        assert!(col.is_null(0));
2159        assert_eq!(col.value(1), f16::ZERO);
2160        assert!(col.value(1).is_sign_positive());
2161        assert!(col.value(2).is_nan());
2162    }
2163
2164    #[test]
2165    fn test_read_float32_float64_byte_stream_split() {
2166        let path = format!(
2167            "{}/byte_stream_split.zstd.parquet",
2168            arrow::util::test_util::parquet_test_data(),
2169        );
2170        let file = File::open(path).unwrap();
2171        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2172
2173        let mut row_count = 0;
2174        for batch in record_reader {
2175            let batch = batch.unwrap();
2176            row_count += batch.num_rows();
2177            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2178            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2179
2180            // This file contains floats from a standard normal distribution
2181            for &x in f32_col.values() {
2182                assert!(x > -10.0);
2183                assert!(x < 10.0);
2184            }
2185            for &x in f64_col.values() {
2186                assert!(x > -10.0);
2187                assert!(x < 10.0);
2188            }
2189        }
2190        assert_eq!(row_count, 300);
2191    }
2192
2193    #[test]
2194    fn test_read_extended_byte_stream_split() {
2195        let path = format!(
2196            "{}/byte_stream_split_extended.gzip.parquet",
2197            arrow::util::test_util::parquet_test_data(),
2198        );
2199        let file = File::open(path).unwrap();
2200        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2201
2202        let mut row_count = 0;
2203        for batch in record_reader {
2204            let batch = batch.unwrap();
2205            row_count += batch.num_rows();
2206
2207            // 0,1 are f16
2208            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2209            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2210            assert_eq!(f16_col.len(), f16_bss.len());
2211            f16_col
2212                .iter()
2213                .zip(f16_bss.iter())
2214                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2215
2216            // 2,3 are f32
2217            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2218            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2219            assert_eq!(f32_col.len(), f32_bss.len());
2220            f32_col
2221                .iter()
2222                .zip(f32_bss.iter())
2223                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2224
2225            // 4,5 are f64
2226            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2227            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2228            assert_eq!(f64_col.len(), f64_bss.len());
2229            f64_col
2230                .iter()
2231                .zip(f64_bss.iter())
2232                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2233
2234            // 6,7 are i32
2235            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2236            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2237            assert_eq!(i32_col.len(), i32_bss.len());
2238            i32_col
2239                .iter()
2240                .zip(i32_bss.iter())
2241                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2242
2243            // 8,9 are i64
2244            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2245            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2246            assert_eq!(i64_col.len(), i64_bss.len());
2247            i64_col
2248                .iter()
2249                .zip(i64_bss.iter())
2250                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2251
2252            // 10,11 are FLBA(5)
2253            let flba_col = batch.column(10).as_fixed_size_binary();
2254            let flba_bss = batch.column(11).as_fixed_size_binary();
2255            assert_eq!(flba_col.len(), flba_bss.len());
2256            flba_col
2257                .iter()
2258                .zip(flba_bss.iter())
2259                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2260
2261            // 12,13 are FLBA(4) (decimal(7,3))
2262            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2263            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2264            assert_eq!(dec_col.len(), dec_bss.len());
2265            dec_col
2266                .iter()
2267                .zip(dec_bss.iter())
2268                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2269        }
2270        assert_eq!(row_count, 200);
2271    }
2272
2273    #[test]
2274    fn test_read_incorrect_map_schema_file() {
2275        let testdata = arrow::util::test_util::parquet_test_data();
2276        // see https://github.com/apache/parquet-testing/pull/47
2277        let path = format!("{testdata}/incorrect_map_schema.parquet");
2278        let file = File::open(path).unwrap();
2279        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2280
2281        let batch = record_reader.next().unwrap().unwrap();
2282        assert_eq!(batch.num_rows(), 1);
2283
2284        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2285            "my_map",
2286            ArrowDataType::Map(
2287                Arc::new(Field::new(
2288                    "key_value",
2289                    ArrowDataType::Struct(Fields::from(vec![
2290                        Field::new("key", ArrowDataType::Utf8, false),
2291                        Field::new("value", ArrowDataType::Utf8, true),
2292                    ])),
2293                    false,
2294                )),
2295                false,
2296            ),
2297            true,
2298        )]));
2299        assert_eq!(batch.schema().as_ref(), &expected_schema);
2300
2301        assert_eq!(batch.num_rows(), 1);
2302        assert_eq!(batch.column(0).null_count(), 0);
2303        assert_eq!(
2304            batch.column(0).as_map().keys().as_ref(),
2305            &StringArray::from(vec!["parent", "name"])
2306        );
2307        assert_eq!(
2308            batch.column(0).as_map().values().as_ref(),
2309            &StringArray::from(vec!["another", "report"])
2310        );
2311    }
2312
2313    #[test]
2314    fn test_read_dict_fixed_size_binary() {
2315        let schema = Arc::new(Schema::new(vec![Field::new(
2316            "a",
2317            ArrowDataType::Dictionary(
2318                Box::new(ArrowDataType::UInt8),
2319                Box::new(ArrowDataType::FixedSizeBinary(8)),
2320            ),
2321            true,
2322        )]));
2323        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2324        let values = FixedSizeBinaryArray::try_from_iter(
2325            vec![
2326                (0u8..8u8).collect::<Vec<u8>>(),
2327                (24u8..32u8).collect::<Vec<u8>>(),
2328            ]
2329            .into_iter(),
2330        )
2331        .unwrap();
2332        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2333        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2334
2335        let mut buffer = Vec::with_capacity(1024);
2336        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2337        writer.write(&batch).unwrap();
2338        writer.close().unwrap();
2339        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2340            .unwrap()
2341            .collect::<Result<Vec<_>, _>>()
2342            .unwrap();
2343
2344        assert_eq!(read.len(), 1);
2345        assert_eq!(&batch, &read[0])
2346    }
2347
2348    /// Parameters for single_column_reader_test
2349    #[derive(Clone)]
2350    struct TestOptions {
2351        /// Number of row group to write to parquet (row group size =
2352        /// num_row_groups / num_rows)
2353        num_row_groups: usize,
2354        /// Total number of rows per row group
2355        num_rows: usize,
2356        /// Size of batches to read back
2357        record_batch_size: usize,
2358        /// Percentage of nulls in column or None if required
2359        null_percent: Option<usize>,
2360        /// Set write batch size
2361        ///
2362        /// This is the number of rows that are written at once to a page and
2363        /// therefore acts as a bound on the page granularity of a row group
2364        write_batch_size: usize,
2365        /// Maximum size of page in bytes
2366        max_data_page_size: usize,
2367        /// Maximum size of dictionary page in bytes
2368        max_dict_page_size: usize,
2369        /// Writer version
2370        writer_version: WriterVersion,
2371        /// Enabled statistics
2372        enabled_statistics: EnabledStatistics,
2373        /// Encoding
2374        encoding: Encoding,
2375        /// row selections and total selected row count
2376        row_selections: Option<(RowSelection, usize)>,
2377        /// row filter
2378        row_filter: Option<Vec<bool>>,
2379        /// limit
2380        limit: Option<usize>,
2381        /// offset
2382        offset: Option<usize>,
2383    }
2384
2385    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2386    impl std::fmt::Debug for TestOptions {
2387        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2388            f.debug_struct("TestOptions")
2389                .field("num_row_groups", &self.num_row_groups)
2390                .field("num_rows", &self.num_rows)
2391                .field("record_batch_size", &self.record_batch_size)
2392                .field("null_percent", &self.null_percent)
2393                .field("write_batch_size", &self.write_batch_size)
2394                .field("max_data_page_size", &self.max_data_page_size)
2395                .field("max_dict_page_size", &self.max_dict_page_size)
2396                .field("writer_version", &self.writer_version)
2397                .field("enabled_statistics", &self.enabled_statistics)
2398                .field("encoding", &self.encoding)
2399                .field("row_selections", &self.row_selections.is_some())
2400                .field("row_filter", &self.row_filter.is_some())
2401                .field("limit", &self.limit)
2402                .field("offset", &self.offset)
2403                .finish()
2404        }
2405    }
2406
2407    impl Default for TestOptions {
2408        fn default() -> Self {
2409            Self {
2410                num_row_groups: 2,
2411                num_rows: 100,
2412                record_batch_size: 15,
2413                null_percent: None,
2414                write_batch_size: 64,
2415                max_data_page_size: 1024 * 1024,
2416                max_dict_page_size: 1024 * 1024,
2417                writer_version: WriterVersion::PARQUET_1_0,
2418                enabled_statistics: EnabledStatistics::Page,
2419                encoding: Encoding::PLAIN,
2420                row_selections: None,
2421                row_filter: None,
2422                limit: None,
2423                offset: None,
2424            }
2425        }
2426    }
2427
2428    impl TestOptions {
2429        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2430            Self {
2431                num_row_groups,
2432                num_rows,
2433                record_batch_size,
2434                ..Default::default()
2435            }
2436        }
2437
2438        fn with_null_percent(self, null_percent: usize) -> Self {
2439            Self {
2440                null_percent: Some(null_percent),
2441                ..self
2442            }
2443        }
2444
2445        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2446            Self {
2447                max_data_page_size,
2448                ..self
2449            }
2450        }
2451
2452        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2453            Self {
2454                max_dict_page_size,
2455                ..self
2456            }
2457        }
2458
2459        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2460            Self {
2461                enabled_statistics,
2462                ..self
2463            }
2464        }
2465
2466        fn with_row_selections(self) -> Self {
2467            assert!(self.row_filter.is_none(), "Must set row selection first");
2468
2469            let mut rng = rng();
2470            let step = rng.random_range(self.record_batch_size..self.num_rows);
2471            let row_selections = create_test_selection(
2472                step,
2473                self.num_row_groups * self.num_rows,
2474                rng.random::<bool>(),
2475            );
2476            Self {
2477                row_selections: Some(row_selections),
2478                ..self
2479            }
2480        }
2481
2482        fn with_row_filter(self) -> Self {
2483            let row_count = match &self.row_selections {
2484                Some((_, count)) => *count,
2485                None => self.num_row_groups * self.num_rows,
2486            };
2487
2488            let mut rng = rng();
2489            Self {
2490                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2491                ..self
2492            }
2493        }
2494
2495        fn with_limit(self, limit: usize) -> Self {
2496            Self {
2497                limit: Some(limit),
2498                ..self
2499            }
2500        }
2501
2502        fn with_offset(self, offset: usize) -> Self {
2503            Self {
2504                offset: Some(offset),
2505                ..self
2506            }
2507        }
2508
2509        fn writer_props(&self) -> WriterProperties {
2510            let builder = WriterProperties::builder()
2511                .set_data_page_size_limit(self.max_data_page_size)
2512                .set_write_batch_size(self.write_batch_size)
2513                .set_writer_version(self.writer_version)
2514                .set_statistics_enabled(self.enabled_statistics);
2515
2516            let builder = match self.encoding {
2517                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2518                    .set_dictionary_enabled(true)
2519                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2520                _ => builder
2521                    .set_dictionary_enabled(false)
2522                    .set_encoding(self.encoding),
2523            };
2524
2525            builder.build()
2526        }
2527    }
2528
2529    /// Create a parquet file and then read it using
2530    /// `ParquetFileArrowReader` using a standard set of parameters
2531    /// `opts`.
2532    ///
2533    /// `rand_max` represents the maximum size of value to pass to to
2534    /// value generator
2535    fn run_single_column_reader_tests<T, F, G>(
2536        rand_max: i32,
2537        converted_type: ConvertedType,
2538        arrow_type: Option<ArrowDataType>,
2539        converter: F,
2540        encodings: &[Encoding],
2541    ) where
2542        T: DataType,
2543        G: RandGen<T>,
2544        F: Fn(&[Option<T::T>]) -> ArrayRef,
2545    {
2546        let all_options = vec![
2547            // choose record_batch_batch (15) so batches cross row
2548            // group boundaries (50 rows in 2 row groups) cases.
2549            TestOptions::new(2, 100, 15),
2550            // choose record_batch_batch (5) so batches sometime fall
2551            // on row group boundaries and (25 rows in 3 row groups
2552            // --> row groups of 10, 10, and 5). Tests buffer
2553            // refilling edge cases.
2554            TestOptions::new(3, 25, 5),
2555            // Choose record_batch_size (25) so all batches fall
2556            // exactly on row group boundary (25). Tests buffer
2557            // refilling edge cases.
2558            TestOptions::new(4, 100, 25),
2559            // Set maximum page size so row groups have multiple pages
2560            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2561            // Set small dictionary page size to test dictionary fallback
2562            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2563            // Test optional but with no nulls
2564            TestOptions::new(2, 256, 127).with_null_percent(0),
2565            // Test optional with nulls
2566            TestOptions::new(2, 256, 93).with_null_percent(25),
2567            // Test with limit of 0
2568            TestOptions::new(4, 100, 25).with_limit(0),
2569            // Test with limit of 50
2570            TestOptions::new(4, 100, 25).with_limit(50),
2571            // Test with limit equal to number of rows
2572            TestOptions::new(4, 100, 25).with_limit(10),
2573            // Test with limit larger than number of rows
2574            TestOptions::new(4, 100, 25).with_limit(101),
2575            // Test with limit + offset equal to number of rows
2576            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2577            // Test with limit + offset equal to number of rows
2578            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2579            // Test with limit + offset larger than number of rows
2580            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2581            // Test with no page-level statistics
2582            TestOptions::new(2, 256, 91)
2583                .with_null_percent(25)
2584                .with_enabled_statistics(EnabledStatistics::Chunk),
2585            // Test with no statistics
2586            TestOptions::new(2, 256, 91)
2587                .with_null_percent(25)
2588                .with_enabled_statistics(EnabledStatistics::None),
2589            // Test with all null
2590            TestOptions::new(2, 128, 91)
2591                .with_null_percent(100)
2592                .with_enabled_statistics(EnabledStatistics::None),
2593            // Test skip
2594
2595            // choose record_batch_batch (15) so batches cross row
2596            // group boundaries (50 rows in 2 row groups) cases.
2597            TestOptions::new(2, 100, 15).with_row_selections(),
2598            // choose record_batch_batch (5) so batches sometime fall
2599            // on row group boundaries and (25 rows in 3 row groups
2600            // --> row groups of 10, 10, and 5). Tests buffer
2601            // refilling edge cases.
2602            TestOptions::new(3, 25, 5).with_row_selections(),
2603            // Choose record_batch_size (25) so all batches fall
2604            // exactly on row group boundary (25). Tests buffer
2605            // refilling edge cases.
2606            TestOptions::new(4, 100, 25).with_row_selections(),
2607            // Set maximum page size so row groups have multiple pages
2608            TestOptions::new(3, 256, 73)
2609                .with_max_data_page_size(128)
2610                .with_row_selections(),
2611            // Set small dictionary page size to test dictionary fallback
2612            TestOptions::new(3, 256, 57)
2613                .with_max_dict_page_size(128)
2614                .with_row_selections(),
2615            // Test optional but with no nulls
2616            TestOptions::new(2, 256, 127)
2617                .with_null_percent(0)
2618                .with_row_selections(),
2619            // Test optional with nulls
2620            TestOptions::new(2, 256, 93)
2621                .with_null_percent(25)
2622                .with_row_selections(),
2623            // Test optional with nulls
2624            TestOptions::new(2, 256, 93)
2625                .with_null_percent(25)
2626                .with_row_selections()
2627                .with_limit(10),
2628            // Test optional with nulls
2629            TestOptions::new(2, 256, 93)
2630                .with_null_percent(25)
2631                .with_row_selections()
2632                .with_offset(20)
2633                .with_limit(10),
2634            // Test filter
2635
2636            // Test with row filter
2637            TestOptions::new(4, 100, 25).with_row_filter(),
2638            // Test with row selection and row filter
2639            TestOptions::new(4, 100, 25)
2640                .with_row_selections()
2641                .with_row_filter(),
2642            // Test with nulls and row filter
2643            TestOptions::new(2, 256, 93)
2644                .with_null_percent(25)
2645                .with_max_data_page_size(10)
2646                .with_row_filter(),
2647            // Test with nulls and row filter and small pages
2648            TestOptions::new(2, 256, 93)
2649                .with_null_percent(25)
2650                .with_max_data_page_size(10)
2651                .with_row_selections()
2652                .with_row_filter(),
2653            // Test with row selection and no offset index and small pages
2654            TestOptions::new(2, 256, 93)
2655                .with_enabled_statistics(EnabledStatistics::None)
2656                .with_max_data_page_size(10)
2657                .with_row_selections(),
2658        ];
2659
2660        all_options.into_iter().for_each(|opts| {
2661            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2662                for encoding in encodings {
2663                    let opts = TestOptions {
2664                        writer_version,
2665                        encoding: *encoding,
2666                        ..opts.clone()
2667                    };
2668
2669                    single_column_reader_test::<T, _, G>(
2670                        opts,
2671                        rand_max,
2672                        converted_type,
2673                        arrow_type.clone(),
2674                        &converter,
2675                    )
2676                }
2677            }
2678        });
2679    }
2680
2681    /// Create a parquet file and then read it using
2682    /// `ParquetFileArrowReader` using the parameters described in
2683    /// `opts`.
2684    fn single_column_reader_test<T, F, G>(
2685        opts: TestOptions,
2686        rand_max: i32,
2687        converted_type: ConvertedType,
2688        arrow_type: Option<ArrowDataType>,
2689        converter: F,
2690    ) where
2691        T: DataType,
2692        G: RandGen<T>,
2693        F: Fn(&[Option<T::T>]) -> ArrayRef,
2694    {
2695        // Print out options to facilitate debugging failures on CI
2696        println!(
2697            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2698            T::get_physical_type(), converted_type, arrow_type, opts
2699        );
2700
2701        //according to null_percent generate def_levels
2702        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2703            Some(null_percent) => {
2704                let mut rng = rng();
2705
2706                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2707                    .map(|_| {
2708                        std::iter::from_fn(|| {
2709                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2710                        })
2711                        .take(opts.num_rows)
2712                        .collect()
2713                    })
2714                    .collect();
2715                (Repetition::OPTIONAL, Some(def_levels))
2716            }
2717            None => (Repetition::REQUIRED, None),
2718        };
2719
2720        //generate random table data
2721        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2722            .map(|idx| {
2723                let null_count = match def_levels.as_ref() {
2724                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2725                    None => 0,
2726                };
2727                G::gen_vec(rand_max, opts.num_rows - null_count)
2728            })
2729            .collect();
2730
2731        let len = match T::get_physical_type() {
2732            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2733            crate::basic::Type::INT96 => 12,
2734            _ => -1,
2735        };
2736
2737        let fields = vec![Arc::new(
2738            Type::primitive_type_builder("leaf", T::get_physical_type())
2739                .with_repetition(repetition)
2740                .with_converted_type(converted_type)
2741                .with_length(len)
2742                .build()
2743                .unwrap(),
2744        )];
2745
2746        let schema = Arc::new(
2747            Type::group_type_builder("test_schema")
2748                .with_fields(fields)
2749                .build()
2750                .unwrap(),
2751        );
2752
2753        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2754
2755        let mut file = tempfile::tempfile().unwrap();
2756
2757        generate_single_column_file_with_data::<T>(
2758            &values,
2759            def_levels.as_ref(),
2760            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2761            schema,
2762            arrow_field,
2763            &opts,
2764        )
2765        .unwrap();
2766
2767        file.rewind().unwrap();
2768
2769        let options = ArrowReaderOptions::new()
2770            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2771
2772        let mut builder =
2773            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2774
2775        let expected_data = match opts.row_selections {
2776            Some((selections, row_count)) => {
2777                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2778
2779                let mut skip_data: Vec<Option<T::T>> = vec![];
2780                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2781                for select in dequeue {
2782                    if select.skip {
2783                        without_skip_data.drain(0..select.row_count);
2784                    } else {
2785                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2786                    }
2787                }
2788                builder = builder.with_row_selection(selections);
2789
2790                assert_eq!(skip_data.len(), row_count);
2791                skip_data
2792            }
2793            None => {
2794                //get flatten table data
2795                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2796                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2797                expected_data
2798            }
2799        };
2800
2801        let mut expected_data = match opts.row_filter {
2802            Some(filter) => {
2803                let expected_data = expected_data
2804                    .into_iter()
2805                    .zip(filter.iter())
2806                    .filter_map(|(d, f)| f.then(|| d))
2807                    .collect();
2808
2809                let mut filter_offset = 0;
2810                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2811                    ProjectionMask::all(),
2812                    move |b| {
2813                        let array = BooleanArray::from_iter(
2814                            filter
2815                                .iter()
2816                                .skip(filter_offset)
2817                                .take(b.num_rows())
2818                                .map(|x| Some(*x)),
2819                        );
2820                        filter_offset += b.num_rows();
2821                        Ok(array)
2822                    },
2823                ))]);
2824
2825                builder = builder.with_row_filter(filter);
2826                expected_data
2827            }
2828            None => expected_data,
2829        };
2830
2831        if let Some(offset) = opts.offset {
2832            builder = builder.with_offset(offset);
2833            expected_data = expected_data.into_iter().skip(offset).collect();
2834        }
2835
2836        if let Some(limit) = opts.limit {
2837            builder = builder.with_limit(limit);
2838            expected_data = expected_data.into_iter().take(limit).collect();
2839        }
2840
2841        let mut record_reader = builder
2842            .with_batch_size(opts.record_batch_size)
2843            .build()
2844            .unwrap();
2845
2846        let mut total_read = 0;
2847        loop {
2848            let maybe_batch = record_reader.next();
2849            if total_read < expected_data.len() {
2850                let end = min(total_read + opts.record_batch_size, expected_data.len());
2851                let batch = maybe_batch.unwrap().unwrap();
2852                assert_eq!(end - total_read, batch.num_rows());
2853
2854                let a = converter(&expected_data[total_read..end]);
2855                let b = Arc::clone(batch.column(0));
2856
2857                assert_eq!(a.data_type(), b.data_type());
2858                assert_eq!(a.to_data(), b.to_data());
2859                assert_eq!(
2860                    a.as_any().type_id(),
2861                    b.as_any().type_id(),
2862                    "incorrect type ids"
2863                );
2864
2865                total_read = end;
2866            } else {
2867                assert!(maybe_batch.is_none());
2868                break;
2869            }
2870        }
2871    }
2872
2873    fn gen_expected_data<T: DataType>(
2874        def_levels: Option<&Vec<Vec<i16>>>,
2875        values: &[Vec<T::T>],
2876    ) -> Vec<Option<T::T>> {
2877        let data: Vec<Option<T::T>> = match def_levels {
2878            Some(levels) => {
2879                let mut values_iter = values.iter().flatten();
2880                levels
2881                    .iter()
2882                    .flatten()
2883                    .map(|d| match d {
2884                        1 => Some(values_iter.next().cloned().unwrap()),
2885                        0 => None,
2886                        _ => unreachable!(),
2887                    })
2888                    .collect()
2889            }
2890            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2891        };
2892        data
2893    }
2894
2895    fn generate_single_column_file_with_data<T: DataType>(
2896        values: &[Vec<T::T>],
2897        def_levels: Option<&Vec<Vec<i16>>>,
2898        file: File,
2899        schema: TypePtr,
2900        field: Option<Field>,
2901        opts: &TestOptions,
2902    ) -> Result<crate::format::FileMetaData> {
2903        let mut writer_props = opts.writer_props();
2904        if let Some(field) = field {
2905            let arrow_schema = Schema::new(vec![field]);
2906            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2907        }
2908
2909        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2910
2911        for (idx, v) in values.iter().enumerate() {
2912            let def_levels = def_levels.map(|d| d[idx].as_slice());
2913            let mut row_group_writer = writer.next_row_group()?;
2914            {
2915                let mut column_writer = row_group_writer
2916                    .next_column()?
2917                    .expect("Column writer is none!");
2918
2919                column_writer
2920                    .typed::<T>()
2921                    .write_batch(v, def_levels, None)?;
2922
2923                column_writer.close()?;
2924            }
2925            row_group_writer.close()?;
2926        }
2927
2928        writer.close()
2929    }
2930
2931    fn get_test_file(file_name: &str) -> File {
2932        let mut path = PathBuf::new();
2933        path.push(arrow::util::test_util::arrow_test_data());
2934        path.push(file_name);
2935
2936        File::open(path.as_path()).expect("File not found!")
2937    }
2938
2939    #[test]
2940    fn test_read_structs() {
2941        // This particular test file has columns of struct types where there is
2942        // a column that has the same name as one of the struct fields
2943        // (see: ARROW-11452)
2944        let testdata = arrow::util::test_util::parquet_test_data();
2945        let path = format!("{testdata}/nested_structs.rust.parquet");
2946        let file = File::open(&path).unwrap();
2947        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2948
2949        for batch in record_batch_reader {
2950            batch.unwrap();
2951        }
2952
2953        let file = File::open(&path).unwrap();
2954        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2955
2956        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2957        let projected_reader = builder
2958            .with_projection(mask)
2959            .with_batch_size(60)
2960            .build()
2961            .unwrap();
2962
2963        let expected_schema = Schema::new(vec![
2964            Field::new(
2965                "roll_num",
2966                ArrowDataType::Struct(Fields::from(vec![Field::new(
2967                    "count",
2968                    ArrowDataType::UInt64,
2969                    false,
2970                )])),
2971                false,
2972            ),
2973            Field::new(
2974                "PC_CUR",
2975                ArrowDataType::Struct(Fields::from(vec![
2976                    Field::new("mean", ArrowDataType::Int64, false),
2977                    Field::new("sum", ArrowDataType::Int64, false),
2978                ])),
2979                false,
2980            ),
2981        ]);
2982
2983        // Tests for #1652 and #1654
2984        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2985
2986        for batch in projected_reader {
2987            let batch = batch.unwrap();
2988            assert_eq!(batch.schema().as_ref(), &expected_schema);
2989        }
2990    }
2991
2992    #[test]
2993    // same as test_read_structs but constructs projection mask via column names
2994    fn test_read_structs_by_name() {
2995        let testdata = arrow::util::test_util::parquet_test_data();
2996        let path = format!("{testdata}/nested_structs.rust.parquet");
2997        let file = File::open(&path).unwrap();
2998        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2999
3000        for batch in record_batch_reader {
3001            batch.unwrap();
3002        }
3003
3004        let file = File::open(&path).unwrap();
3005        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3006
3007        let mask = ProjectionMask::columns(
3008            builder.parquet_schema(),
3009            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3010        );
3011        let projected_reader = builder
3012            .with_projection(mask)
3013            .with_batch_size(60)
3014            .build()
3015            .unwrap();
3016
3017        let expected_schema = Schema::new(vec![
3018            Field::new(
3019                "roll_num",
3020                ArrowDataType::Struct(Fields::from(vec![Field::new(
3021                    "count",
3022                    ArrowDataType::UInt64,
3023                    false,
3024                )])),
3025                false,
3026            ),
3027            Field::new(
3028                "PC_CUR",
3029                ArrowDataType::Struct(Fields::from(vec![
3030                    Field::new("mean", ArrowDataType::Int64, false),
3031                    Field::new("sum", ArrowDataType::Int64, false),
3032                ])),
3033                false,
3034            ),
3035        ]);
3036
3037        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3038
3039        for batch in projected_reader {
3040            let batch = batch.unwrap();
3041            assert_eq!(batch.schema().as_ref(), &expected_schema);
3042        }
3043    }
3044
3045    #[test]
3046    fn test_read_maps() {
3047        let testdata = arrow::util::test_util::parquet_test_data();
3048        let path = format!("{testdata}/nested_maps.snappy.parquet");
3049        let file = File::open(path).unwrap();
3050        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3051
3052        for batch in record_batch_reader {
3053            batch.unwrap();
3054        }
3055    }
3056
3057    #[test]
3058    fn test_nested_nullability() {
3059        let message_type = "message nested {
3060          OPTIONAL Group group {
3061            REQUIRED INT32 leaf;
3062          }
3063        }";
3064
3065        let file = tempfile::tempfile().unwrap();
3066        let schema = Arc::new(parse_message_type(message_type).unwrap());
3067
3068        {
3069            // Write using low-level parquet API (#1167)
3070            let mut writer =
3071                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3072                    .unwrap();
3073
3074            {
3075                let mut row_group_writer = writer.next_row_group().unwrap();
3076                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3077
3078                column_writer
3079                    .typed::<Int32Type>()
3080                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3081                    .unwrap();
3082
3083                column_writer.close().unwrap();
3084                row_group_writer.close().unwrap();
3085            }
3086
3087            writer.close().unwrap();
3088        }
3089
3090        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3091        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3092
3093        let reader = builder.with_projection(mask).build().unwrap();
3094
3095        let expected_schema = Schema::new(Fields::from(vec![Field::new(
3096            "group",
3097            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3098            true,
3099        )]));
3100
3101        let batch = reader.into_iter().next().unwrap().unwrap();
3102        assert_eq!(batch.schema().as_ref(), &expected_schema);
3103        assert_eq!(batch.num_rows(), 4);
3104        assert_eq!(batch.column(0).null_count(), 2);
3105    }
3106
3107    #[test]
3108    fn test_invalid_utf8() {
3109        // a parquet file with 1 column with invalid utf8
3110        let data = vec![
3111            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3112            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3113            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3114            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3115            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3116            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3117            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3118            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3119            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3120            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3121            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3122            82, 49,
3123        ];
3124
3125        let file = Bytes::from(data);
3126        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3127
3128        let error = record_batch_reader.next().unwrap().unwrap_err();
3129
3130        assert!(
3131            error.to_string().contains("invalid utf-8 sequence"),
3132            "{}",
3133            error
3134        );
3135    }
3136
3137    #[test]
3138    fn test_invalid_utf8_string_array() {
3139        test_invalid_utf8_string_array_inner::<i32>();
3140    }
3141
3142    #[test]
3143    fn test_invalid_utf8_large_string_array() {
3144        test_invalid_utf8_string_array_inner::<i64>();
3145    }
3146
3147    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3148        let cases = [
3149            invalid_utf8_first_char::<O>(),
3150            invalid_utf8_first_char_long_strings::<O>(),
3151            invalid_utf8_later_char::<O>(),
3152            invalid_utf8_later_char_long_strings::<O>(),
3153            invalid_utf8_later_char_really_long_strings::<O>(),
3154            invalid_utf8_later_char_really_long_strings2::<O>(),
3155        ];
3156        for array in &cases {
3157            for encoding in STRING_ENCODINGS {
3158                // data is not valid utf8 we can not construct a correct StringArray
3159                // safely, so purposely create an invalid StringArray
3160                let array = unsafe {
3161                    GenericStringArray::<O>::new_unchecked(
3162                        array.offsets().clone(),
3163                        array.values().clone(),
3164                        array.nulls().cloned(),
3165                    )
3166                };
3167                let data_type = array.data_type().clone();
3168                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3169                let err = read_from_parquet(data).unwrap_err();
3170                let expected_err =
3171                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3172                assert!(
3173                    err.to_string().contains(expected_err),
3174                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3175                );
3176            }
3177        }
3178    }
3179
3180    #[test]
3181    fn test_invalid_utf8_string_view_array() {
3182        let cases = [
3183            invalid_utf8_first_char::<i32>(),
3184            invalid_utf8_first_char_long_strings::<i32>(),
3185            invalid_utf8_later_char::<i32>(),
3186            invalid_utf8_later_char_long_strings::<i32>(),
3187            invalid_utf8_later_char_really_long_strings::<i32>(),
3188            invalid_utf8_later_char_really_long_strings2::<i32>(),
3189        ];
3190
3191        for encoding in STRING_ENCODINGS {
3192            for array in &cases {
3193                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3194                let array = array.as_binary_view();
3195
3196                // data is not valid utf8 we can not construct a correct StringArray
3197                // safely, so purposely create an invalid StringViewArray
3198                let array = unsafe {
3199                    StringViewArray::new_unchecked(
3200                        array.views().clone(),
3201                        array.data_buffers().to_vec(),
3202                        array.nulls().cloned(),
3203                    )
3204                };
3205
3206                let data_type = array.data_type().clone();
3207                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3208                let err = read_from_parquet(data).unwrap_err();
3209                let expected_err =
3210                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3211                assert!(
3212                    err.to_string().contains(expected_err),
3213                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3214                );
3215            }
3216        }
3217    }
3218
3219    /// Encodings suitable for string data
3220    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3221        None,
3222        Some(Encoding::PLAIN),
3223        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3224        Some(Encoding::DELTA_BYTE_ARRAY),
3225    ];
3226
3227    /// Invalid Utf-8 sequence in the first character
3228    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3229    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3230
3231    /// Invalid Utf=8 sequence in NOT the first character
3232    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3233    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3234
3235    /// returns a BinaryArray with invalid UTF8 data in the first character
3236    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3237        let valid: &[u8] = b"   ";
3238        let invalid = INVALID_UTF8_FIRST_CHAR;
3239        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3240    }
3241
3242    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3243    /// string larger than 12 bytes which is handled specially when reading
3244    /// `ByteViewArray`s
3245    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3246        let valid: &[u8] = b"   ";
3247        let mut invalid = vec![];
3248        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3249        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3250        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3251    }
3252
3253    /// returns a BinaryArray with invalid UTF8 data in a character other than
3254    /// the first (this is checked in a special codepath)
3255    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3256        let valid: &[u8] = b"   ";
3257        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3258        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3259    }
3260
3261    /// returns a BinaryArray with invalid UTF8 data in a character other than
3262    /// the first in a string larger than 12 bytes which is handled specially
3263    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3264    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3265        let valid: &[u8] = b"   ";
3266        let mut invalid = vec![];
3267        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3268        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3269        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3270    }
3271
3272    /// returns a BinaryArray with invalid UTF8 data in a character other than
3273    /// the first in a string larger than 128 bytes which is handled specially
3274    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3275    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3276        let valid: &[u8] = b"   ";
3277        let mut invalid = vec![];
3278        for _ in 0..10 {
3279            // each instance is 38 bytes
3280            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3281        }
3282        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3283        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3284    }
3285
3286    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3287    /// invalid UTF8 data in a character other than the first in a string larger
3288    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3289        let valid: &[u8] = b"   ";
3290        let mut valid_long = vec![];
3291        for _ in 0..10 {
3292            // each instance is 38 bytes
3293            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3294        }
3295        let invalid = INVALID_UTF8_LATER_CHAR;
3296        GenericBinaryArray::<O>::from_iter(vec![
3297            None,
3298            Some(valid),
3299            Some(invalid),
3300            None,
3301            Some(&valid_long),
3302            Some(valid),
3303        ])
3304    }
3305
3306    /// writes the array into a single column parquet file with the specified
3307    /// encoding.
3308    ///
3309    /// If no encoding is specified, use default (dictionary) encoding
3310    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3311        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3312        let mut data = vec![];
3313        let schema = batch.schema();
3314        let props = encoding.map(|encoding| {
3315            WriterProperties::builder()
3316                // must disable dictionary encoding to actually use encoding
3317                .set_dictionary_enabled(false)
3318                .set_encoding(encoding)
3319                .build()
3320        });
3321
3322        {
3323            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3324            writer.write(&batch).unwrap();
3325            writer.flush().unwrap();
3326            writer.close().unwrap();
3327        };
3328        data
3329    }
3330
3331    /// read the parquet file into a record batch
3332    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3333        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3334            .unwrap()
3335            .build()
3336            .unwrap();
3337
3338        reader.collect()
3339    }
3340
3341    #[test]
3342    fn test_dictionary_preservation() {
3343        let fields = vec![Arc::new(
3344            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3345                .with_repetition(Repetition::OPTIONAL)
3346                .with_converted_type(ConvertedType::UTF8)
3347                .build()
3348                .unwrap(),
3349        )];
3350
3351        let schema = Arc::new(
3352            Type::group_type_builder("test_schema")
3353                .with_fields(fields)
3354                .build()
3355                .unwrap(),
3356        );
3357
3358        let dict_type = ArrowDataType::Dictionary(
3359            Box::new(ArrowDataType::Int32),
3360            Box::new(ArrowDataType::Utf8),
3361        );
3362
3363        let arrow_field = Field::new("leaf", dict_type, true);
3364
3365        let mut file = tempfile::tempfile().unwrap();
3366
3367        let values = vec![
3368            vec![
3369                ByteArray::from("hello"),
3370                ByteArray::from("a"),
3371                ByteArray::from("b"),
3372                ByteArray::from("d"),
3373            ],
3374            vec![
3375                ByteArray::from("c"),
3376                ByteArray::from("a"),
3377                ByteArray::from("b"),
3378            ],
3379        ];
3380
3381        let def_levels = vec![
3382            vec![1, 0, 0, 1, 0, 0, 1, 1],
3383            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3384        ];
3385
3386        let opts = TestOptions {
3387            encoding: Encoding::RLE_DICTIONARY,
3388            ..Default::default()
3389        };
3390
3391        generate_single_column_file_with_data::<ByteArrayType>(
3392            &values,
3393            Some(&def_levels),
3394            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3395            schema,
3396            Some(arrow_field),
3397            &opts,
3398        )
3399        .unwrap();
3400
3401        file.rewind().unwrap();
3402
3403        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3404
3405        let batches = record_reader
3406            .collect::<Result<Vec<RecordBatch>, _>>()
3407            .unwrap();
3408
3409        assert_eq!(batches.len(), 6);
3410        assert!(batches.iter().all(|x| x.num_columns() == 1));
3411
3412        let row_counts = batches
3413            .iter()
3414            .map(|x| (x.num_rows(), x.column(0).null_count()))
3415            .collect::<Vec<_>>();
3416
3417        assert_eq!(
3418            row_counts,
3419            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3420        );
3421
3422        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3423
3424        // First and second batch in same row group -> same dictionary
3425        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3426        // Third batch spans row group -> computed dictionary
3427        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3428        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3429        // Fourth, fifth and sixth from same row group -> same dictionary
3430        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3431        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3432    }
3433
3434    #[test]
3435    fn test_read_null_list() {
3436        let testdata = arrow::util::test_util::parquet_test_data();
3437        let path = format!("{testdata}/null_list.parquet");
3438        let file = File::open(path).unwrap();
3439        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3440
3441        let batch = record_batch_reader.next().unwrap().unwrap();
3442        assert_eq!(batch.num_rows(), 1);
3443        assert_eq!(batch.num_columns(), 1);
3444        assert_eq!(batch.column(0).len(), 1);
3445
3446        let list = batch
3447            .column(0)
3448            .as_any()
3449            .downcast_ref::<ListArray>()
3450            .unwrap();
3451        assert_eq!(list.len(), 1);
3452        assert!(list.is_valid(0));
3453
3454        let val = list.value(0);
3455        assert_eq!(val.len(), 0);
3456    }
3457
3458    #[test]
3459    fn test_null_schema_inference() {
3460        let testdata = arrow::util::test_util::parquet_test_data();
3461        let path = format!("{testdata}/null_list.parquet");
3462        let file = File::open(path).unwrap();
3463
3464        let arrow_field = Field::new(
3465            "emptylist",
3466            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3467            true,
3468        );
3469
3470        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3471        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3472        let schema = builder.schema();
3473        assert_eq!(schema.fields().len(), 1);
3474        assert_eq!(schema.field(0), &arrow_field);
3475    }
3476
3477    #[test]
3478    fn test_skip_metadata() {
3479        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3480        let field = Field::new("col", col.data_type().clone(), true);
3481
3482        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3483
3484        let metadata = [("key".to_string(), "value".to_string())]
3485            .into_iter()
3486            .collect();
3487
3488        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3489
3490        assert_ne!(schema_with_metadata, schema_without_metadata);
3491
3492        let batch =
3493            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3494
3495        let file = |version: WriterVersion| {
3496            let props = WriterProperties::builder()
3497                .set_writer_version(version)
3498                .build();
3499
3500            let file = tempfile().unwrap();
3501            let mut writer =
3502                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3503                    .unwrap();
3504            writer.write(&batch).unwrap();
3505            writer.close().unwrap();
3506            file
3507        };
3508
3509        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3510
3511        let v1_reader = file(WriterVersion::PARQUET_1_0);
3512        let v2_reader = file(WriterVersion::PARQUET_2_0);
3513
3514        let arrow_reader =
3515            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3516        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3517
3518        let reader =
3519            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3520                .unwrap()
3521                .build()
3522                .unwrap();
3523        assert_eq!(reader.schema(), schema_without_metadata);
3524
3525        let arrow_reader =
3526            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3527        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3528
3529        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3530            .unwrap()
3531            .build()
3532            .unwrap();
3533        assert_eq!(reader.schema(), schema_without_metadata);
3534    }
3535
3536    fn write_parquet_from_iter<I, F>(value: I) -> File
3537    where
3538        I: IntoIterator<Item = (F, ArrayRef)>,
3539        F: AsRef<str>,
3540    {
3541        let batch = RecordBatch::try_from_iter(value).unwrap();
3542        let file = tempfile().unwrap();
3543        let mut writer =
3544            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3545        writer.write(&batch).unwrap();
3546        writer.close().unwrap();
3547        file
3548    }
3549
3550    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3551    where
3552        I: IntoIterator<Item = (F, ArrayRef)>,
3553        F: AsRef<str>,
3554    {
3555        let file = write_parquet_from_iter(value);
3556        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3557        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3558            file.try_clone().unwrap(),
3559            options_with_schema,
3560        );
3561        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3562    }
3563
3564    #[test]
3565    fn test_schema_too_few_columns() {
3566        run_schema_test_with_error(
3567            vec![
3568                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3569                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3570            ],
3571            Arc::new(Schema::new(vec![Field::new(
3572                "int64",
3573                ArrowDataType::Int64,
3574                false,
3575            )])),
3576            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3577        );
3578    }
3579
3580    #[test]
3581    fn test_schema_too_many_columns() {
3582        run_schema_test_with_error(
3583            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3584            Arc::new(Schema::new(vec![
3585                Field::new("int64", ArrowDataType::Int64, false),
3586                Field::new("int32", ArrowDataType::Int32, false),
3587            ])),
3588            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3589        );
3590    }
3591
3592    #[test]
3593    fn test_schema_mismatched_column_names() {
3594        run_schema_test_with_error(
3595            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3596            Arc::new(Schema::new(vec![Field::new(
3597                "other",
3598                ArrowDataType::Int64,
3599                false,
3600            )])),
3601            "Arrow: incompatible arrow schema, expected field named int64 got other",
3602        );
3603    }
3604
3605    #[test]
3606    fn test_schema_incompatible_columns() {
3607        run_schema_test_with_error(
3608            vec![
3609                (
3610                    "col1_invalid",
3611                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3612                ),
3613                (
3614                    "col2_valid",
3615                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3616                ),
3617                (
3618                    "col3_invalid",
3619                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3620                ),
3621            ],
3622            Arc::new(Schema::new(vec![
3623                Field::new("col1_invalid", ArrowDataType::Int32, false),
3624                Field::new("col2_valid", ArrowDataType::Int32, false),
3625                Field::new("col3_invalid", ArrowDataType::Int32, false),
3626            ])),
3627            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3628        );
3629    }
3630
3631    #[test]
3632    fn test_one_incompatible_nested_column() {
3633        let nested_fields = Fields::from(vec![
3634            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3635            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3636        ]);
3637        let nested = StructArray::try_new(
3638            nested_fields,
3639            vec![
3640                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3641                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3642            ],
3643            None,
3644        )
3645        .expect("struct array");
3646        let supplied_nested_fields = Fields::from(vec![
3647            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3648            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3649        ]);
3650        run_schema_test_with_error(
3651            vec![
3652                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3653                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3654                ("nested", Arc::new(nested) as ArrayRef),
3655            ],
3656            Arc::new(Schema::new(vec![
3657                Field::new("col1", ArrowDataType::Int64, false),
3658                Field::new("col2", ArrowDataType::Int32, false),
3659                Field::new(
3660                    "nested",
3661                    ArrowDataType::Struct(supplied_nested_fields),
3662                    false,
3663                ),
3664            ])),
3665            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3666            requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3667            but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3668        );
3669    }
3670
3671    /// Return parquet data with a single column of utf8 strings
3672    fn utf8_parquet() -> Bytes {
3673        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3674        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3675        let props = None;
3676        // write parquet file with non nullable strings
3677        let mut parquet_data = vec![];
3678        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3679        writer.write(&batch).unwrap();
3680        writer.close().unwrap();
3681        Bytes::from(parquet_data)
3682    }
3683
3684    #[test]
3685    fn test_schema_error_bad_types() {
3686        // verify incompatible schemas error on read
3687        let parquet_data = utf8_parquet();
3688
3689        // Ask to read it back with an incompatible schema (int vs string)
3690        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3691            "column1",
3692            arrow::datatypes::DataType::Int32,
3693            false,
3694        )]));
3695
3696        // read it back out
3697        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3698        let err =
3699            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3700                .unwrap_err();
3701        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3702    }
3703
3704    #[test]
3705    fn test_schema_error_bad_nullability() {
3706        // verify incompatible schemas error on read
3707        let parquet_data = utf8_parquet();
3708
3709        // Ask to read it back with an incompatible schema (nullability mismatch)
3710        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3711            "column1",
3712            arrow::datatypes::DataType::Utf8,
3713            true,
3714        )]));
3715
3716        // read it back out
3717        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3718        let err =
3719            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3720                .unwrap_err();
3721        assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3722    }
3723
3724    #[test]
3725    fn test_read_binary_as_utf8() {
3726        let file = write_parquet_from_iter(vec![
3727            (
3728                "binary_to_utf8",
3729                Arc::new(BinaryArray::from(vec![
3730                    b"one".as_ref(),
3731                    b"two".as_ref(),
3732                    b"three".as_ref(),
3733                ])) as ArrayRef,
3734            ),
3735            (
3736                "large_binary_to_large_utf8",
3737                Arc::new(LargeBinaryArray::from(vec![
3738                    b"one".as_ref(),
3739                    b"two".as_ref(),
3740                    b"three".as_ref(),
3741                ])) as ArrayRef,
3742            ),
3743            (
3744                "binary_view_to_utf8_view",
3745                Arc::new(BinaryViewArray::from(vec![
3746                    b"one".as_ref(),
3747                    b"two".as_ref(),
3748                    b"three".as_ref(),
3749                ])) as ArrayRef,
3750            ),
3751        ]);
3752        let supplied_fields = Fields::from(vec![
3753            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3754            Field::new(
3755                "large_binary_to_large_utf8",
3756                ArrowDataType::LargeUtf8,
3757                false,
3758            ),
3759            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3760        ]);
3761
3762        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3763        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3764            file.try_clone().unwrap(),
3765            options,
3766        )
3767        .expect("reader builder with schema")
3768        .build()
3769        .expect("reader with schema");
3770
3771        let batch = arrow_reader.next().unwrap().unwrap();
3772        assert_eq!(batch.num_columns(), 3);
3773        assert_eq!(batch.num_rows(), 3);
3774        assert_eq!(
3775            batch
3776                .column(0)
3777                .as_string::<i32>()
3778                .iter()
3779                .collect::<Vec<_>>(),
3780            vec![Some("one"), Some("two"), Some("three")]
3781        );
3782
3783        assert_eq!(
3784            batch
3785                .column(1)
3786                .as_string::<i64>()
3787                .iter()
3788                .collect::<Vec<_>>(),
3789            vec![Some("one"), Some("two"), Some("three")]
3790        );
3791
3792        assert_eq!(
3793            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3794            vec![Some("one"), Some("two"), Some("three")]
3795        );
3796    }
3797
3798    #[test]
3799    #[should_panic(expected = "Invalid UTF8 sequence at")]
3800    fn test_read_non_utf8_binary_as_utf8() {
3801        let file = write_parquet_from_iter(vec![(
3802            "non_utf8_binary",
3803            Arc::new(BinaryArray::from(vec![
3804                b"\xDE\x00\xFF".as_ref(),
3805                b"\xDE\x01\xAA".as_ref(),
3806                b"\xDE\x02\xFF".as_ref(),
3807            ])) as ArrayRef,
3808        )]);
3809        let supplied_fields = Fields::from(vec![Field::new(
3810            "non_utf8_binary",
3811            ArrowDataType::Utf8,
3812            false,
3813        )]);
3814
3815        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3816        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3817            file.try_clone().unwrap(),
3818            options,
3819        )
3820        .expect("reader builder with schema")
3821        .build()
3822        .expect("reader with schema");
3823        arrow_reader.next().unwrap().unwrap_err();
3824    }
3825
3826    #[test]
3827    fn test_with_schema() {
3828        let nested_fields = Fields::from(vec![
3829            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3830            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3831        ]);
3832
3833        let nested_arrays: Vec<ArrayRef> = vec![
3834            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3835            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3836        ];
3837
3838        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3839
3840        let file = write_parquet_from_iter(vec![
3841            (
3842                "int32_to_ts_second",
3843                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3844            ),
3845            (
3846                "date32_to_date64",
3847                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3848            ),
3849            ("nested", Arc::new(nested) as ArrayRef),
3850        ]);
3851
3852        let supplied_nested_fields = Fields::from(vec![
3853            Field::new(
3854                "utf8_to_dict",
3855                ArrowDataType::Dictionary(
3856                    Box::new(ArrowDataType::Int32),
3857                    Box::new(ArrowDataType::Utf8),
3858                ),
3859                false,
3860            ),
3861            Field::new(
3862                "int64_to_ts_nano",
3863                ArrowDataType::Timestamp(
3864                    arrow::datatypes::TimeUnit::Nanosecond,
3865                    Some("+10:00".into()),
3866                ),
3867                false,
3868            ),
3869        ]);
3870
3871        let supplied_schema = Arc::new(Schema::new(vec![
3872            Field::new(
3873                "int32_to_ts_second",
3874                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3875                false,
3876            ),
3877            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3878            Field::new(
3879                "nested",
3880                ArrowDataType::Struct(supplied_nested_fields),
3881                false,
3882            ),
3883        ]));
3884
3885        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3886        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3887            file.try_clone().unwrap(),
3888            options,
3889        )
3890        .expect("reader builder with schema")
3891        .build()
3892        .expect("reader with schema");
3893
3894        assert_eq!(arrow_reader.schema(), supplied_schema);
3895        let batch = arrow_reader.next().unwrap().unwrap();
3896        assert_eq!(batch.num_columns(), 3);
3897        assert_eq!(batch.num_rows(), 4);
3898        assert_eq!(
3899            batch
3900                .column(0)
3901                .as_any()
3902                .downcast_ref::<TimestampSecondArray>()
3903                .expect("downcast to timestamp second")
3904                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3905                .map(|v| v.to_string())
3906                .expect("value as datetime"),
3907            "1970-01-01 01:00:00 +01:00"
3908        );
3909        assert_eq!(
3910            batch
3911                .column(1)
3912                .as_any()
3913                .downcast_ref::<Date64Array>()
3914                .expect("downcast to date64")
3915                .value_as_date(0)
3916                .map(|v| v.to_string())
3917                .expect("value as date"),
3918            "1970-01-01"
3919        );
3920
3921        let nested = batch
3922            .column(2)
3923            .as_any()
3924            .downcast_ref::<StructArray>()
3925            .expect("downcast to struct");
3926
3927        let nested_dict = nested
3928            .column(0)
3929            .as_any()
3930            .downcast_ref::<Int32DictionaryArray>()
3931            .expect("downcast to dictionary");
3932
3933        assert_eq!(
3934            nested_dict
3935                .values()
3936                .as_any()
3937                .downcast_ref::<StringArray>()
3938                .expect("downcast to string")
3939                .iter()
3940                .collect::<Vec<_>>(),
3941            vec![Some("a"), Some("b")]
3942        );
3943
3944        assert_eq!(
3945            nested_dict.keys().iter().collect::<Vec<_>>(),
3946            vec![Some(0), Some(0), Some(0), Some(1)]
3947        );
3948
3949        assert_eq!(
3950            nested
3951                .column(1)
3952                .as_any()
3953                .downcast_ref::<TimestampNanosecondArray>()
3954                .expect("downcast to timestamp nanosecond")
3955                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3956                .map(|v| v.to_string())
3957                .expect("value as datetime"),
3958            "1970-01-01 10:00:00.000000001 +10:00"
3959        );
3960    }
3961
3962    #[test]
3963    fn test_empty_projection() {
3964        let testdata = arrow::util::test_util::parquet_test_data();
3965        let path = format!("{testdata}/alltypes_plain.parquet");
3966        let file = File::open(path).unwrap();
3967
3968        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3969        let file_metadata = builder.metadata().file_metadata();
3970        let expected_rows = file_metadata.num_rows() as usize;
3971
3972        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3973        let batch_reader = builder
3974            .with_projection(mask)
3975            .with_batch_size(2)
3976            .build()
3977            .unwrap();
3978
3979        let mut total_rows = 0;
3980        for maybe_batch in batch_reader {
3981            let batch = maybe_batch.unwrap();
3982            total_rows += batch.num_rows();
3983            assert_eq!(batch.num_columns(), 0);
3984            assert!(batch.num_rows() <= 2);
3985        }
3986
3987        assert_eq!(total_rows, expected_rows);
3988    }
3989
3990    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3991        let schema = Arc::new(Schema::new(vec![Field::new(
3992            "list",
3993            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3994            true,
3995        )]));
3996
3997        let mut buf = Vec::with_capacity(1024);
3998
3999        let mut writer = ArrowWriter::try_new(
4000            &mut buf,
4001            schema.clone(),
4002            Some(
4003                WriterProperties::builder()
4004                    .set_max_row_group_size(row_group_size)
4005                    .build(),
4006            ),
4007        )
4008        .unwrap();
4009        for _ in 0..2 {
4010            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4011            for _ in 0..(batch_size) {
4012                list_builder.append(true);
4013            }
4014            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4015                .unwrap();
4016            writer.write(&batch).unwrap();
4017        }
4018        writer.close().unwrap();
4019
4020        let mut record_reader =
4021            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4022        assert_eq!(
4023            batch_size,
4024            record_reader.next().unwrap().unwrap().num_rows()
4025        );
4026        assert_eq!(
4027            batch_size,
4028            record_reader.next().unwrap().unwrap().num_rows()
4029        );
4030    }
4031
4032    #[test]
4033    fn test_row_group_exact_multiple() {
4034        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4035        test_row_group_batch(8, 8);
4036        test_row_group_batch(10, 8);
4037        test_row_group_batch(8, 10);
4038        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4039        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4040        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4041        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4042        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4043    }
4044
4045    /// Given a RecordBatch containing all the column data, return the expected batches given
4046    /// a `batch_size` and `selection`
4047    fn get_expected_batches(
4048        column: &RecordBatch,
4049        selection: &RowSelection,
4050        batch_size: usize,
4051    ) -> Vec<RecordBatch> {
4052        let mut expected_batches = vec![];
4053
4054        let mut selection: VecDeque<_> = selection.clone().into();
4055        let mut row_offset = 0;
4056        let mut last_start = None;
4057        while row_offset < column.num_rows() && !selection.is_empty() {
4058            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4059            while batch_remaining > 0 && !selection.is_empty() {
4060                let (to_read, skip) = match selection.front_mut() {
4061                    Some(selection) if selection.row_count > batch_remaining => {
4062                        selection.row_count -= batch_remaining;
4063                        (batch_remaining, selection.skip)
4064                    }
4065                    Some(_) => {
4066                        let select = selection.pop_front().unwrap();
4067                        (select.row_count, select.skip)
4068                    }
4069                    None => break,
4070                };
4071
4072                batch_remaining -= to_read;
4073
4074                match skip {
4075                    true => {
4076                        if let Some(last_start) = last_start.take() {
4077                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4078                        }
4079                        row_offset += to_read
4080                    }
4081                    false => {
4082                        last_start.get_or_insert(row_offset);
4083                        row_offset += to_read
4084                    }
4085                }
4086            }
4087        }
4088
4089        if let Some(last_start) = last_start.take() {
4090            expected_batches.push(column.slice(last_start, row_offset - last_start))
4091        }
4092
4093        // Sanity check, all batches except the final should be the batch size
4094        for batch in &expected_batches[..expected_batches.len() - 1] {
4095            assert_eq!(batch.num_rows(), batch_size);
4096        }
4097
4098        expected_batches
4099    }
4100
4101    fn create_test_selection(
4102        step_len: usize,
4103        total_len: usize,
4104        skip_first: bool,
4105    ) -> (RowSelection, usize) {
4106        let mut remaining = total_len;
4107        let mut skip = skip_first;
4108        let mut vec = vec![];
4109        let mut selected_count = 0;
4110        while remaining != 0 {
4111            let step = if remaining > step_len {
4112                step_len
4113            } else {
4114                remaining
4115            };
4116            vec.push(RowSelector {
4117                row_count: step,
4118                skip,
4119            });
4120            remaining -= step;
4121            if !skip {
4122                selected_count += step;
4123            }
4124            skip = !skip;
4125        }
4126        (vec.into(), selected_count)
4127    }
4128
4129    #[test]
4130    fn test_scan_row_with_selection() {
4131        let testdata = arrow::util::test_util::parquet_test_data();
4132        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4133        let test_file = File::open(&path).unwrap();
4134
4135        let mut serial_reader =
4136            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4137        let data = serial_reader.next().unwrap().unwrap();
4138
4139        let do_test = |batch_size: usize, selection_len: usize| {
4140            for skip_first in [false, true] {
4141                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4142
4143                let expected = get_expected_batches(&data, &selections, batch_size);
4144                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4145                assert_eq!(
4146                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4147                    expected,
4148                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4149                );
4150            }
4151        };
4152
4153        // total row count 7300
4154        // 1. test selection len more than one page row count
4155        do_test(1000, 1000);
4156
4157        // 2. test selection len less than one page row count
4158        do_test(20, 20);
4159
4160        // 3. test selection_len less than batch_size
4161        do_test(20, 5);
4162
4163        // 4. test selection_len more than batch_size
4164        // If batch_size < selection_len
4165        do_test(20, 5);
4166
4167        fn create_skip_reader(
4168            test_file: &File,
4169            batch_size: usize,
4170            selections: RowSelection,
4171        ) -> ParquetRecordBatchReader {
4172            let options = ArrowReaderOptions::new().with_page_index(true);
4173            let file = test_file.try_clone().unwrap();
4174            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4175                .unwrap()
4176                .with_batch_size(batch_size)
4177                .with_row_selection(selections)
4178                .build()
4179                .unwrap()
4180        }
4181    }
4182
4183    #[test]
4184    fn test_batch_size_overallocate() {
4185        let testdata = arrow::util::test_util::parquet_test_data();
4186        // `alltypes_plain.parquet` only have 8 rows
4187        let path = format!("{testdata}/alltypes_plain.parquet");
4188        let test_file = File::open(path).unwrap();
4189
4190        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4191        let num_rows = builder.metadata.file_metadata().num_rows();
4192        let reader = builder
4193            .with_batch_size(1024)
4194            .with_projection(ProjectionMask::all())
4195            .build()
4196            .unwrap();
4197        assert_ne!(1024, num_rows);
4198        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4199    }
4200
4201    #[test]
4202    fn test_read_with_page_index_enabled() {
4203        let testdata = arrow::util::test_util::parquet_test_data();
4204
4205        {
4206            // `alltypes_tiny_pages.parquet` has page index
4207            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4208            let test_file = File::open(path).unwrap();
4209            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4210                test_file,
4211                ArrowReaderOptions::new().with_page_index(true),
4212            )
4213            .unwrap();
4214            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4215            let reader = builder.build().unwrap();
4216            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4217            assert_eq!(batches.len(), 8);
4218        }
4219
4220        {
4221            // `alltypes_plain.parquet` doesn't have page index
4222            let path = format!("{testdata}/alltypes_plain.parquet");
4223            let test_file = File::open(path).unwrap();
4224            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4225                test_file,
4226                ArrowReaderOptions::new().with_page_index(true),
4227            )
4228            .unwrap();
4229            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4230            // we should read the file successfully.
4231            assert!(builder.metadata().offset_index().is_none());
4232            let reader = builder.build().unwrap();
4233            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4234            assert_eq!(batches.len(), 1);
4235        }
4236    }
4237
4238    #[test]
4239    fn test_raw_repetition() {
4240        const MESSAGE_TYPE: &str = "
4241            message Log {
4242              OPTIONAL INT32 eventType;
4243              REPEATED INT32 category;
4244              REPEATED group filter {
4245                OPTIONAL INT32 error;
4246              }
4247            }
4248        ";
4249        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4250        let props = Default::default();
4251
4252        let mut buf = Vec::with_capacity(1024);
4253        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4254        let mut row_group_writer = writer.next_row_group().unwrap();
4255
4256        // column 0
4257        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4258        col_writer
4259            .typed::<Int32Type>()
4260            .write_batch(&[1], Some(&[1]), None)
4261            .unwrap();
4262        col_writer.close().unwrap();
4263        // column 1
4264        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4265        col_writer
4266            .typed::<Int32Type>()
4267            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4268            .unwrap();
4269        col_writer.close().unwrap();
4270        // column 2
4271        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4272        col_writer
4273            .typed::<Int32Type>()
4274            .write_batch(&[1], Some(&[1]), Some(&[0]))
4275            .unwrap();
4276        col_writer.close().unwrap();
4277
4278        let rg_md = row_group_writer.close().unwrap();
4279        assert_eq!(rg_md.num_rows(), 1);
4280        writer.close().unwrap();
4281
4282        let bytes = Bytes::from(buf);
4283
4284        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4285        let full = no_mask.next().unwrap().unwrap();
4286
4287        assert_eq!(full.num_columns(), 3);
4288
4289        for idx in 0..3 {
4290            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4291            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4292            let mut reader = b.with_projection(mask).build().unwrap();
4293            let projected = reader.next().unwrap().unwrap();
4294
4295            assert_eq!(projected.num_columns(), 1);
4296            assert_eq!(full.column(idx), projected.column(0));
4297        }
4298    }
4299
4300    #[test]
4301    fn test_read_lz4_raw() {
4302        let testdata = arrow::util::test_util::parquet_test_data();
4303        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4304        let file = File::open(path).unwrap();
4305
4306        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4307            .unwrap()
4308            .collect::<Result<Vec<_>, _>>()
4309            .unwrap();
4310        assert_eq!(batches.len(), 1);
4311        let batch = &batches[0];
4312
4313        assert_eq!(batch.num_columns(), 3);
4314        assert_eq!(batch.num_rows(), 4);
4315
4316        // https://github.com/apache/parquet-testing/pull/18
4317        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4318        assert_eq!(
4319            a.values(),
4320            &[1593604800, 1593604800, 1593604801, 1593604801]
4321        );
4322
4323        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4324        let a: Vec<_> = a.iter().flatten().collect();
4325        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4326
4327        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4328        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4329    }
4330
4331    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4332    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4333    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4334    //    LZ4_HADOOP algorithm for compression.
4335    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4336    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4337    //    older parquet-cpp versions.
4338    //
4339    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4340    #[test]
4341    fn test_read_lz4_hadoop_fallback() {
4342        for file in [
4343            "hadoop_lz4_compressed.parquet",
4344            "non_hadoop_lz4_compressed.parquet",
4345        ] {
4346            let testdata = arrow::util::test_util::parquet_test_data();
4347            let path = format!("{testdata}/{file}");
4348            let file = File::open(path).unwrap();
4349            let expected_rows = 4;
4350
4351            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4352                .unwrap()
4353                .collect::<Result<Vec<_>, _>>()
4354                .unwrap();
4355            assert_eq!(batches.len(), 1);
4356            let batch = &batches[0];
4357
4358            assert_eq!(batch.num_columns(), 3);
4359            assert_eq!(batch.num_rows(), expected_rows);
4360
4361            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4362            assert_eq!(
4363                a.values(),
4364                &[1593604800, 1593604800, 1593604801, 1593604801]
4365            );
4366
4367            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4368            let b: Vec<_> = b.iter().flatten().collect();
4369            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4370
4371            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4372            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4373        }
4374    }
4375
4376    #[test]
4377    fn test_read_lz4_hadoop_large() {
4378        let testdata = arrow::util::test_util::parquet_test_data();
4379        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4380        let file = File::open(path).unwrap();
4381        let expected_rows = 10000;
4382
4383        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4384            .unwrap()
4385            .collect::<Result<Vec<_>, _>>()
4386            .unwrap();
4387        assert_eq!(batches.len(), 1);
4388        let batch = &batches[0];
4389
4390        assert_eq!(batch.num_columns(), 1);
4391        assert_eq!(batch.num_rows(), expected_rows);
4392
4393        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4394        let a: Vec<_> = a.iter().flatten().collect();
4395        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4396        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4397        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4398        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4399    }
4400
4401    #[test]
4402    #[cfg(feature = "snap")]
4403    fn test_read_nested_lists() {
4404        let testdata = arrow::util::test_util::parquet_test_data();
4405        let path = format!("{testdata}/nested_lists.snappy.parquet");
4406        let file = File::open(path).unwrap();
4407
4408        let f = file.try_clone().unwrap();
4409        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4410        let expected = reader.next().unwrap().unwrap();
4411        assert_eq!(expected.num_rows(), 3);
4412
4413        let selection = RowSelection::from(vec![
4414            RowSelector::skip(1),
4415            RowSelector::select(1),
4416            RowSelector::skip(1),
4417        ]);
4418        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4419            .unwrap()
4420            .with_row_selection(selection)
4421            .build()
4422            .unwrap();
4423
4424        let actual = reader.next().unwrap().unwrap();
4425        assert_eq!(actual.num_rows(), 1);
4426        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4427    }
4428
4429    #[test]
4430    fn test_arbitrary_decimal() {
4431        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4432        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4433            .with_precision_and_scale(19, 0)
4434            .unwrap();
4435        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4436            .with_precision_and_scale(12, 0)
4437            .unwrap();
4438        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4439            .with_precision_and_scale(17, 10)
4440            .unwrap();
4441
4442        let written = RecordBatch::try_from_iter([
4443            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4444            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4445            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4446        ])
4447        .unwrap();
4448
4449        let mut buffer = Vec::with_capacity(1024);
4450        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4451        writer.write(&written).unwrap();
4452        writer.close().unwrap();
4453
4454        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4455            .unwrap()
4456            .collect::<Result<Vec<_>, _>>()
4457            .unwrap();
4458
4459        assert_eq!(&written.slice(0, 8), &read[0]);
4460    }
4461
4462    #[test]
4463    fn test_list_skip() {
4464        let mut list = ListBuilder::new(Int32Builder::new());
4465        list.append_value([Some(1), Some(2)]);
4466        list.append_value([Some(3)]);
4467        list.append_value([Some(4)]);
4468        let list = list.finish();
4469        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4470
4471        // First page contains 2 values but only 1 row
4472        let props = WriterProperties::builder()
4473            .set_data_page_row_count_limit(1)
4474            .set_write_batch_size(2)
4475            .build();
4476
4477        let mut buffer = Vec::with_capacity(1024);
4478        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4479        writer.write(&batch).unwrap();
4480        writer.close().unwrap();
4481
4482        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4483        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4484            .unwrap()
4485            .with_row_selection(selection.into())
4486            .build()
4487            .unwrap();
4488        let out = reader.next().unwrap().unwrap();
4489        assert_eq!(out.num_rows(), 1);
4490        assert_eq!(out, batch.slice(2, 1));
4491    }
4492
4493    fn test_decimal32_roundtrip() {
4494        let d = |values: Vec<i32>, p: u8| {
4495            let iter = values.into_iter();
4496            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4497                .with_precision_and_scale(p, 2)
4498                .unwrap()
4499        };
4500
4501        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4502        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4503
4504        let mut buffer = Vec::with_capacity(1024);
4505        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4506        writer.write(&batch).unwrap();
4507        writer.close().unwrap();
4508
4509        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4510        let t1 = builder.parquet_schema().columns()[0].physical_type();
4511        assert_eq!(t1, PhysicalType::INT32);
4512
4513        let mut reader = builder.build().unwrap();
4514        assert_eq!(batch.schema(), reader.schema());
4515
4516        let out = reader.next().unwrap().unwrap();
4517        assert_eq!(batch, out);
4518    }
4519
4520    fn test_decimal64_roundtrip() {
4521        // Precision <= 9 -> INT32
4522        // Precision <= 18 -> INT64
4523
4524        let d = |values: Vec<i64>, p: u8| {
4525            let iter = values.into_iter();
4526            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4527                .with_precision_and_scale(p, 2)
4528                .unwrap()
4529        };
4530
4531        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4532        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4533        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4534
4535        let batch = RecordBatch::try_from_iter([
4536            ("d1", Arc::new(d1) as ArrayRef),
4537            ("d2", Arc::new(d2) as ArrayRef),
4538            ("d3", Arc::new(d3) as ArrayRef),
4539        ])
4540        .unwrap();
4541
4542        let mut buffer = Vec::with_capacity(1024);
4543        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4544        writer.write(&batch).unwrap();
4545        writer.close().unwrap();
4546
4547        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4548        let t1 = builder.parquet_schema().columns()[0].physical_type();
4549        assert_eq!(t1, PhysicalType::INT32);
4550        let t2 = builder.parquet_schema().columns()[1].physical_type();
4551        assert_eq!(t2, PhysicalType::INT64);
4552        let t3 = builder.parquet_schema().columns()[2].physical_type();
4553        assert_eq!(t3, PhysicalType::INT64);
4554
4555        let mut reader = builder.build().unwrap();
4556        assert_eq!(batch.schema(), reader.schema());
4557
4558        let out = reader.next().unwrap().unwrap();
4559        assert_eq!(batch, out);
4560    }
4561
4562    fn test_decimal_roundtrip<T: DecimalType>() {
4563        // Precision <= 9 -> INT32
4564        // Precision <= 18 -> INT64
4565        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4566
4567        let d = |values: Vec<usize>, p: u8| {
4568            let iter = values.into_iter().map(T::Native::usize_as);
4569            PrimitiveArray::<T>::from_iter_values(iter)
4570                .with_precision_and_scale(p, 2)
4571                .unwrap()
4572        };
4573
4574        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4575        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4576        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4577        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4578
4579        let batch = RecordBatch::try_from_iter([
4580            ("d1", Arc::new(d1) as ArrayRef),
4581            ("d2", Arc::new(d2) as ArrayRef),
4582            ("d3", Arc::new(d3) as ArrayRef),
4583            ("d4", Arc::new(d4) as ArrayRef),
4584        ])
4585        .unwrap();
4586
4587        let mut buffer = Vec::with_capacity(1024);
4588        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4589        writer.write(&batch).unwrap();
4590        writer.close().unwrap();
4591
4592        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4593        let t1 = builder.parquet_schema().columns()[0].physical_type();
4594        assert_eq!(t1, PhysicalType::INT32);
4595        let t2 = builder.parquet_schema().columns()[1].physical_type();
4596        assert_eq!(t2, PhysicalType::INT64);
4597        let t3 = builder.parquet_schema().columns()[2].physical_type();
4598        assert_eq!(t3, PhysicalType::INT64);
4599        let t4 = builder.parquet_schema().columns()[3].physical_type();
4600        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4601
4602        let mut reader = builder.build().unwrap();
4603        assert_eq!(batch.schema(), reader.schema());
4604
4605        let out = reader.next().unwrap().unwrap();
4606        assert_eq!(batch, out);
4607    }
4608
4609    #[test]
4610    fn test_decimal() {
4611        test_decimal32_roundtrip();
4612        test_decimal64_roundtrip();
4613        test_decimal_roundtrip::<Decimal128Type>();
4614        test_decimal_roundtrip::<Decimal256Type>();
4615    }
4616
4617    #[test]
4618    fn test_list_selection() {
4619        let schema = Arc::new(Schema::new(vec![Field::new_list(
4620            "list",
4621            Field::new_list_field(ArrowDataType::Utf8, true),
4622            false,
4623        )]));
4624        let mut buf = Vec::with_capacity(1024);
4625
4626        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4627
4628        for i in 0..2 {
4629            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4630            for j in 0..1024 {
4631                list_a_builder.values().append_value(format!("{i} {j}"));
4632                list_a_builder.append(true);
4633            }
4634            let batch =
4635                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4636                    .unwrap();
4637            writer.write(&batch).unwrap();
4638        }
4639        let _metadata = writer.close().unwrap();
4640
4641        let buf = Bytes::from(buf);
4642        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4643            .unwrap()
4644            .with_row_selection(RowSelection::from(vec![
4645                RowSelector::skip(100),
4646                RowSelector::select(924),
4647                RowSelector::skip(100),
4648                RowSelector::select(924),
4649            ]))
4650            .build()
4651            .unwrap();
4652
4653        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4654        let batch = concat_batches(&schema, &batches).unwrap();
4655
4656        assert_eq!(batch.num_rows(), 924 * 2);
4657        let list = batch.column(0).as_list::<i32>();
4658
4659        for w in list.value_offsets().windows(2) {
4660            assert_eq!(w[0] + 1, w[1])
4661        }
4662        let mut values = list.values().as_string::<i32>().iter();
4663
4664        for i in 0..2 {
4665            for j in 100..1024 {
4666                let expected = format!("{i} {j}");
4667                assert_eq!(values.next().unwrap().unwrap(), &expected);
4668            }
4669        }
4670    }
4671
4672    #[test]
4673    fn test_list_selection_fuzz() {
4674        let mut rng = rng();
4675        let schema = Arc::new(Schema::new(vec![Field::new_list(
4676            "list",
4677            Field::new_list(
4678                Field::LIST_FIELD_DEFAULT_NAME,
4679                Field::new_list_field(ArrowDataType::Int32, true),
4680                true,
4681            ),
4682            true,
4683        )]));
4684        let mut buf = Vec::with_capacity(1024);
4685        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4686
4687        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4688
4689        for _ in 0..2048 {
4690            if rng.random_bool(0.2) {
4691                list_a_builder.append(false);
4692                continue;
4693            }
4694
4695            let list_a_len = rng.random_range(0..10);
4696            let list_b_builder = list_a_builder.values();
4697
4698            for _ in 0..list_a_len {
4699                if rng.random_bool(0.2) {
4700                    list_b_builder.append(false);
4701                    continue;
4702                }
4703
4704                let list_b_len = rng.random_range(0..10);
4705                let int_builder = list_b_builder.values();
4706                for _ in 0..list_b_len {
4707                    match rng.random_bool(0.2) {
4708                        true => int_builder.append_null(),
4709                        false => int_builder.append_value(rng.random()),
4710                    }
4711                }
4712                list_b_builder.append(true)
4713            }
4714            list_a_builder.append(true);
4715        }
4716
4717        let array = Arc::new(list_a_builder.finish());
4718        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4719
4720        writer.write(&batch).unwrap();
4721        let _metadata = writer.close().unwrap();
4722
4723        let buf = Bytes::from(buf);
4724
4725        let cases = [
4726            vec![
4727                RowSelector::skip(100),
4728                RowSelector::select(924),
4729                RowSelector::skip(100),
4730                RowSelector::select(924),
4731            ],
4732            vec![
4733                RowSelector::select(924),
4734                RowSelector::skip(100),
4735                RowSelector::select(924),
4736                RowSelector::skip(100),
4737            ],
4738            vec![
4739                RowSelector::skip(1023),
4740                RowSelector::select(1),
4741                RowSelector::skip(1023),
4742                RowSelector::select(1),
4743            ],
4744            vec![
4745                RowSelector::select(1),
4746                RowSelector::skip(1023),
4747                RowSelector::select(1),
4748                RowSelector::skip(1023),
4749            ],
4750        ];
4751
4752        for batch_size in [100, 1024, 2048] {
4753            for selection in &cases {
4754                let selection = RowSelection::from(selection.clone());
4755                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4756                    .unwrap()
4757                    .with_row_selection(selection.clone())
4758                    .with_batch_size(batch_size)
4759                    .build()
4760                    .unwrap();
4761
4762                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4763                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4764                assert_eq!(actual.num_rows(), selection.row_count());
4765
4766                let mut batch_offset = 0;
4767                let mut actual_offset = 0;
4768                for selector in selection.iter() {
4769                    if selector.skip {
4770                        batch_offset += selector.row_count;
4771                        continue;
4772                    }
4773
4774                    assert_eq!(
4775                        batch.slice(batch_offset, selector.row_count),
4776                        actual.slice(actual_offset, selector.row_count)
4777                    );
4778
4779                    batch_offset += selector.row_count;
4780                    actual_offset += selector.row_count;
4781                }
4782            }
4783        }
4784    }
4785
4786    #[test]
4787    fn test_read_old_nested_list() {
4788        use arrow::datatypes::DataType;
4789        use arrow::datatypes::ToByteSlice;
4790
4791        let testdata = arrow::util::test_util::parquet_test_data();
4792        // message my_record {
4793        //     REQUIRED group a (LIST) {
4794        //         REPEATED group array (LIST) {
4795        //             REPEATED INT32 array;
4796        //         }
4797        //     }
4798        // }
4799        // should be read as list<list<int32>>
4800        let path = format!("{testdata}/old_list_structure.parquet");
4801        let test_file = File::open(path).unwrap();
4802
4803        // create expected ListArray
4804        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4805
4806        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4807        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4808
4809        // Construct a list array from the above two
4810        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4811            "array",
4812            DataType::Int32,
4813            false,
4814        ))))
4815        .len(2)
4816        .add_buffer(a_value_offsets)
4817        .add_child_data(a_values.into_data())
4818        .build()
4819        .unwrap();
4820        let a = ListArray::from(a_list_data);
4821
4822        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4823        let mut reader = builder.build().unwrap();
4824        let out = reader.next().unwrap().unwrap();
4825        assert_eq!(out.num_rows(), 1);
4826        assert_eq!(out.num_columns(), 1);
4827        // grab first column
4828        let c0 = out.column(0);
4829        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4830        // get first row: [[1, 2], [3, 4]]
4831        let r0 = c0arr.value(0);
4832        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4833        assert_eq!(r0arr, &a);
4834    }
4835
4836    #[test]
4837    fn test_map_no_value() {
4838        // File schema:
4839        // message schema {
4840        //   required group my_map (MAP) {
4841        //     repeated group key_value {
4842        //       required int32 key;
4843        //       optional int32 value;
4844        //     }
4845        //   }
4846        //   required group my_map_no_v (MAP) {
4847        //     repeated group key_value {
4848        //       required int32 key;
4849        //     }
4850        //   }
4851        //   required group my_list (LIST) {
4852        //     repeated group list {
4853        //       required int32 element;
4854        //     }
4855        //   }
4856        // }
4857        let testdata = arrow::util::test_util::parquet_test_data();
4858        let path = format!("{testdata}/map_no_value.parquet");
4859        let file = File::open(path).unwrap();
4860
4861        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4862            .unwrap()
4863            .build()
4864            .unwrap();
4865        let out = reader.next().unwrap().unwrap();
4866        assert_eq!(out.num_rows(), 3);
4867        assert_eq!(out.num_columns(), 3);
4868        // my_map_no_v and my_list columns should now be equivalent
4869        let c0 = out.column(1).as_list::<i32>();
4870        let c1 = out.column(2).as_list::<i32>();
4871        assert_eq!(c0.len(), c1.len());
4872        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4873    }
4874
4875    #[test]
4876    fn test_get_row_group_column_bloom_filter_with_length() {
4877        // convert to new parquet file with bloom_filter_length
4878        let testdata = arrow::util::test_util::parquet_test_data();
4879        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4880        let file = File::open(path).unwrap();
4881        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4882        let schema = builder.schema().clone();
4883        let reader = builder.build().unwrap();
4884
4885        let mut parquet_data = Vec::new();
4886        let props = WriterProperties::builder()
4887            .set_bloom_filter_enabled(true)
4888            .build();
4889        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4890        for batch in reader {
4891            let batch = batch.unwrap();
4892            writer.write(&batch).unwrap();
4893        }
4894        writer.close().unwrap();
4895
4896        // test the new parquet file
4897        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4898    }
4899
4900    #[test]
4901    fn test_get_row_group_column_bloom_filter_without_length() {
4902        let testdata = arrow::util::test_util::parquet_test_data();
4903        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4904        let data = Bytes::from(std::fs::read(path).unwrap());
4905        test_get_row_group_column_bloom_filter(data, false);
4906    }
4907
4908    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4909        let mut builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4910
4911        let metadata = builder.metadata();
4912        assert_eq!(metadata.num_row_groups(), 1);
4913        let row_group = metadata.row_group(0);
4914        let column = row_group.column(0);
4915        assert_eq!(column.bloom_filter_length().is_some(), with_length);
4916
4917        let sbbf = builder
4918            .get_row_group_column_bloom_filter(0, 0)
4919            .unwrap()
4920            .unwrap();
4921        assert!(sbbf.check(&"Hello"));
4922        assert!(!sbbf.check(&"Hello_Not_Exists"));
4923    }
4924}