parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
43};
44use crate::file::reader::{ChunkReader, SerializedPageReader};
45use crate::schema::types::SchemaDescriptor;
46
47use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
48pub use read_plan::{ReadPlan, ReadPlanBuilder};
49
50mod filter;
51pub mod metrics;
52mod read_plan;
53mod selection;
54pub mod statistics;
55
56/// Builder for constructing Parquet readers that decode into [Apache Arrow]
57/// arrays.
58///
59/// Most users should use one of the following specializations:
60///
61/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
62/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
63/// * decoder API: [`ParquetPushDecoderBuilder`]
64///
65/// # Features
66/// * Projection pushdown: [`Self::with_projection`]
67/// * Cached metadata: [`ArrowReaderMetadata::load`]
68/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
69/// * Row group filtering: [`Self::with_row_groups`]
70/// * Range filtering: [`Self::with_row_selection`]
71/// * Row level filtering: [`Self::with_row_filter`]
72///
73/// # Implementing Predicate Pushdown
74///
75/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
76/// process, which is efficient and allows the most low level optimizations.
77///
78/// However, most Parquet based systems will apply filters at many steps prior
79/// to decoding such as pruning files, row groups and data pages. This crate
80/// provides the low level APIs needed to implement such filtering, but does not
81/// include any logic to actually evaluate predicates. For example:
82///
83/// * [`Self::with_row_groups`] for Row Group pruning
84/// * [`Self::with_row_selection`] for data page pruning
85/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
86///
87/// The rationale for this design is that implementing predicate pushdown is a
88/// complex topic and varies significantly from system to system. For example
89///
90/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
91/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
92/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
93/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
94///
95/// You can read more about this design in the [Querying Parquet with
96/// Millisecond Latency] Arrow blog post.
97///
98/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
99/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
100/// [Apache Arrow]: https://arrow.apache.org/
101/// [`StatisticsConverter`]: statistics::StatisticsConverter
102/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
103pub struct ArrowReaderBuilder<T> {
104    /// The "input" to read parquet data from.
105    ///
106    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
107    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
108    ///
109    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
110    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
111    pub(crate) input: T,
112
113    pub(crate) metadata: Arc<ParquetMetaData>,
114
115    pub(crate) schema: SchemaRef,
116
117    pub(crate) fields: Option<Arc<ParquetField>>,
118
119    pub(crate) batch_size: usize,
120
121    pub(crate) row_groups: Option<Vec<usize>>,
122
123    pub(crate) projection: ProjectionMask,
124
125    pub(crate) filter: Option<RowFilter>,
126
127    pub(crate) selection: Option<RowSelection>,
128
129    pub(crate) limit: Option<usize>,
130
131    pub(crate) offset: Option<usize>,
132
133    pub(crate) metrics: ArrowReaderMetrics,
134
135    pub(crate) max_predicate_cache_size: usize,
136}
137
138impl<T: Debug> Debug for ArrowReaderBuilder<T> {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct("ArrowReaderBuilder<T>")
141            .field("input", &self.input)
142            .field("metadata", &self.metadata)
143            .field("schema", &self.schema)
144            .field("fields", &self.fields)
145            .field("batch_size", &self.batch_size)
146            .field("row_groups", &self.row_groups)
147            .field("projection", &self.projection)
148            .field("filter", &self.filter)
149            .field("selection", &self.selection)
150            .field("limit", &self.limit)
151            .field("offset", &self.offset)
152            .field("metrics", &self.metrics)
153            .finish()
154    }
155}
156
157impl<T> ArrowReaderBuilder<T> {
158    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
159        Self {
160            input,
161            metadata: metadata.metadata,
162            schema: metadata.schema,
163            fields: metadata.fields,
164            batch_size: 1024,
165            row_groups: None,
166            projection: ProjectionMask::all(),
167            filter: None,
168            selection: None,
169            limit: None,
170            offset: None,
171            metrics: ArrowReaderMetrics::Disabled,
172            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
173        }
174    }
175
176    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
177    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
178        &self.metadata
179    }
180
181    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
182    pub fn parquet_schema(&self) -> &SchemaDescriptor {
183        self.metadata.file_metadata().schema_descr()
184    }
185
186    /// Returns the arrow [`SchemaRef`] for this parquet file
187    pub fn schema(&self) -> &SchemaRef {
188        &self.schema
189    }
190
191    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
192    /// If the batch_size more than the file row count, use the file row count.
193    pub fn with_batch_size(self, batch_size: usize) -> Self {
194        // Try to avoid allocate large buffer
195        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
196        Self { batch_size, ..self }
197    }
198
199    /// Only read data from the provided row group indexes
200    ///
201    /// This is also called row group filtering
202    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
203        Self {
204            row_groups: Some(row_groups),
205            ..self
206        }
207    }
208
209    /// Only read data from the provided column indexes
210    pub fn with_projection(self, mask: ProjectionMask) -> Self {
211        Self {
212            projection: mask,
213            ..self
214        }
215    }
216
217    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
218    /// data into memory.
219    ///
220    /// This feature is used to restrict which rows are decoded within row
221    /// groups, skipping ranges of rows that are not needed. Such selections
222    /// could be determined by evaluating predicates against the parquet page
223    /// [`Index`] or some other external information available to a query
224    /// engine.
225    ///
226    /// # Notes
227    ///
228    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
229    /// applying the row selection, and therefore rows from skipped row groups
230    /// should not be included in the [`RowSelection`] (see example below)
231    ///
232    /// It is recommended to enable writing the page index if using this
233    /// functionality, to allow more efficient skipping over data pages. See
234    /// [`ArrowReaderOptions::with_page_index`].
235    ///
236    /// # Example
237    ///
238    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
239    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
240    /// row group 3:
241    ///
242    /// ```text
243    ///   Row Group 0, 1000 rows (selected)
244    ///   Row Group 1, 1000 rows (skipped)
245    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
246    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
247    /// ```
248    ///
249    /// You could pass the following [`RowSelection`]:
250    ///
251    /// ```text
252    ///  Select 1000    (scan all rows in row group 0)
253    ///  Skip 50        (skip the first 50 rows in row group 2)
254    ///  Select 50      (scan rows 50-100 in row group 2)
255    ///  Skip 900       (skip the remaining rows in row group 2)
256    ///  Skip 200       (skip the first 200 rows in row group 3)
257    ///  Select 100     (scan rows 200-300 in row group 3)
258    ///  Skip 700       (skip the remaining rows in row group 3)
259    /// ```
260    /// Note there is no entry for the (entirely) skipped row group 1.
261    ///
262    /// Note you can represent the same selection with fewer entries. Instead of
263    ///
264    /// ```text
265    ///  Skip 900       (skip the remaining rows in row group 2)
266    ///  Skip 200       (skip the first 200 rows in row group 3)
267    /// ```
268    ///
269    /// you could use
270    ///
271    /// ```text
272    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
273    /// ```
274    ///
275    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
276    pub fn with_row_selection(self, selection: RowSelection) -> Self {
277        Self {
278            selection: Some(selection),
279            ..self
280        }
281    }
282
283    /// Provide a [`RowFilter`] to skip decoding rows
284    ///
285    /// Row filters are applied after row group selection and row selection
286    ///
287    /// It is recommended to enable reading the page index if using this functionality, to allow
288    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
289    pub fn with_row_filter(self, filter: RowFilter) -> Self {
290        Self {
291            filter: Some(filter),
292            ..self
293        }
294    }
295
296    /// Provide a limit to the number of rows to be read
297    ///
298    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
299    /// allowing it to limit the final set of rows decoded after any pushed down predicates
300    ///
301    /// It is recommended to enable reading the page index if using this functionality, to allow
302    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
303    pub fn with_limit(self, limit: usize) -> Self {
304        Self {
305            limit: Some(limit),
306            ..self
307        }
308    }
309
310    /// Provide an offset to skip over the given number of rows
311    ///
312    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
313    /// allowing it to skip rows after any pushed down predicates
314    ///
315    /// It is recommended to enable reading the page index if using this functionality, to allow
316    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
317    pub fn with_offset(self, offset: usize) -> Self {
318        Self {
319            offset: Some(offset),
320            ..self
321        }
322    }
323
324    /// Specify metrics collection during reading
325    ///
326    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
327    /// clone of the provided metrics to the builder.
328    ///
329    /// For example:
330    ///
331    /// ```rust
332    /// # use std::sync::Arc;
333    /// # use bytes::Bytes;
334    /// # use arrow_array::{Int32Array, RecordBatch};
335    /// # use arrow_schema::{DataType, Field, Schema};
336    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
337    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
338    /// # use parquet::arrow::ArrowWriter;
339    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
340    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
341    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
342    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
343    /// # writer.write(&batch).unwrap();
344    /// # writer.close().unwrap();
345    /// # let file = Bytes::from(file);
346    /// // Create metrics object to pass into the reader
347    /// let metrics = ArrowReaderMetrics::enabled();
348    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
349    ///   // Configure the builder to use the metrics by passing a clone
350    ///   .with_metrics(metrics.clone())
351    ///   // Build the reader
352    ///   .build().unwrap();
353    /// // .. read data from the reader ..
354    ///
355    /// // check the metrics
356    /// assert!(metrics.records_read_from_inner().is_some());
357    /// ```
358    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
359        Self { metrics, ..self }
360    }
361
362    /// Set the maximum size (per row group) of the predicate cache in bytes for
363    /// the async decoder.
364    ///
365    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
366    /// unlimited cache size.
367    ///
368    /// This cache is used to store decoded arrays that are used in
369    /// predicate evaluation ([`Self::with_row_filter`]).
370    ///
371    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
372    /// [this ticket] for more details and alternatives.
373    ///
374    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
375    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
376    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
377        Self {
378            max_predicate_cache_size,
379            ..self
380        }
381    }
382}
383
384/// Options that control how [`ParquetMetaData`] is read when constructing
385/// an Arrow reader.
386///
387/// To use these options, pass them to one of the following methods:
388/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
389/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
390///
391/// For fine-grained control over metadata loading, use
392/// [`ArrowReaderMetadata::load`] to load metadata with these options,
393///
394/// See [`ArrowReaderBuilder`] for how to configure how the column data
395/// is then read from the file, including projection and filter pushdown
396///
397/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
398#[derive(Debug, Clone, Default)]
399pub struct ArrowReaderOptions {
400    /// Should the reader strip any user defined metadata from the Arrow schema
401    skip_arrow_metadata: bool,
402    /// If provided, used as the schema hint when determining the Arrow schema,
403    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
404    ///
405    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
406    supplied_schema: Option<SchemaRef>,
407    /// Policy for reading offset and column indexes.
408    pub(crate) page_index_policy: PageIndexPolicy,
409    /// Options to control reading of Parquet metadata
410    metadata_options: ParquetMetaDataOptions,
411    /// If encryption is enabled, the file decryption properties can be provided
412    #[cfg(feature = "encryption")]
413    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
414}
415
416impl ArrowReaderOptions {
417    /// Create a new [`ArrowReaderOptions`] with the default settings
418    pub fn new() -> Self {
419        Self::default()
420    }
421
422    /// Skip decoding the embedded arrow metadata (defaults to `false`)
423    ///
424    /// Parquet files generated by some writers may contain embedded arrow
425    /// schema and metadata.
426    /// This may not be correct or compatible with your system,
427    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
428    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
429        Self {
430            skip_arrow_metadata,
431            ..self
432        }
433    }
434
435    /// Provide a schema hint to use when reading the Parquet file.
436    ///
437    /// If provided, this schema takes precedence over any arrow schema embedded
438    /// in the metadata (see the [`arrow`] documentation for more details).
439    ///
440    /// If the provided schema is not compatible with the data stored in the
441    /// parquet file schema, an error will be returned when constructing the
442    /// builder.
443    ///
444    /// This option is only required if you want to explicitly control the
445    /// conversion of Parquet types to Arrow types, such as casting a column to
446    /// a different type. For example, if you wanted to read an Int64 in
447    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
448    ///
449    /// [`arrow`]: crate::arrow
450    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
451    ///
452    /// # Notes
453    ///
454    /// The provided schema must have the same number of columns as the parquet schema and
455    /// the column names must be the same.
456    ///
457    /// # Example
458    /// ```
459    /// use std::io::Bytes;
460    /// use std::sync::Arc;
461    /// use tempfile::tempfile;
462    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
463    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
464    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
465    /// use parquet::arrow::ArrowWriter;
466    ///
467    /// // Write data - schema is inferred from the data to be Int32
468    /// let file = tempfile().unwrap();
469    /// let batch = RecordBatch::try_from_iter(vec![
470    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
471    /// ]).unwrap();
472    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
473    /// writer.write(&batch).unwrap();
474    /// writer.close().unwrap();
475    ///
476    /// // Read the file back.
477    /// // Supply a schema that interprets the Int32 column as a Timestamp.
478    /// let supplied_schema = Arc::new(Schema::new(vec![
479    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
480    /// ]));
481    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
482    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
483    ///     file.try_clone().unwrap(),
484    ///     options
485    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
486    ///
487    /// // Create the reader and read the data using the supplied schema.
488    /// let mut reader = builder.build().unwrap();
489    /// let _batch = reader.next().unwrap().unwrap();
490    /// ```
491    pub fn with_schema(self, schema: SchemaRef) -> Self {
492        Self {
493            supplied_schema: Some(schema),
494            skip_arrow_metadata: true,
495            ..self
496        }
497    }
498
499    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
500    ///
501    /// The `PageIndex` can be used to push down predicates to the parquet scan,
502    /// potentially eliminating unnecessary IO, by some query engines.
503    ///
504    /// If this is enabled, [`ParquetMetaData::column_index`] and
505    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
506    /// information is present in the file.
507    ///
508    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
509    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
510    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
511    pub fn with_page_index(self, page_index: bool) -> Self {
512        let page_index_policy = PageIndexPolicy::from(page_index);
513
514        Self {
515            page_index_policy,
516            ..self
517        }
518    }
519
520    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
521    ///
522    /// See [`Self::with_page_index`] for more details.
523    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
524        Self {
525            page_index_policy: policy,
526            ..self
527        }
528    }
529
530    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
531    /// footer will be skipped.
532    ///
533    /// This can be used to avoid reparsing the schema from the file when it is
534    /// already known.
535    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
536        self.metadata_options.set_schema(schema);
537        self
538    }
539
540    /// Provide the file decryption properties to use when reading encrypted parquet files.
541    ///
542    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
543    #[cfg(feature = "encryption")]
544    pub fn with_file_decryption_properties(
545        self,
546        file_decryption_properties: Arc<FileDecryptionProperties>,
547    ) -> Self {
548        Self {
549            file_decryption_properties: Some(file_decryption_properties),
550            ..self
551        }
552    }
553
554    /// Retrieve the currently set page index behavior.
555    ///
556    /// This can be set via [`with_page_index`][Self::with_page_index].
557    pub fn page_index(&self) -> bool {
558        self.page_index_policy != PageIndexPolicy::Skip
559    }
560
561    /// Retrieve the currently set metadata decoding options.
562    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
563        &self.metadata_options
564    }
565
566    /// Retrieve the currently set file decryption properties.
567    ///
568    /// This can be set via
569    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
570    #[cfg(feature = "encryption")]
571    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
572        self.file_decryption_properties.as_ref()
573    }
574}
575
576/// The metadata necessary to construct a [`ArrowReaderBuilder`]
577///
578/// Note this structure is cheaply clone-able as it consists of several arcs.
579///
580/// This structure allows
581///
582/// 1. Loading metadata for a file once and then using that same metadata to
583///    construct multiple separate readers, for example, to distribute readers
584///    across multiple threads
585///
586/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
587///    from the file each time a reader is constructed.
588///
589/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
590#[derive(Debug, Clone)]
591pub struct ArrowReaderMetadata {
592    /// The Parquet Metadata, if known aprior
593    pub(crate) metadata: Arc<ParquetMetaData>,
594    /// The Arrow Schema
595    pub(crate) schema: SchemaRef,
596    /// The Parquet schema (root field)
597    pub(crate) fields: Option<Arc<ParquetField>>,
598}
599
600impl ArrowReaderMetadata {
601    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
602    /// and [`ChunkReader`]
603    ///
604    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
605    /// example of how this can be used
606    ///
607    /// # Notes
608    ///
609    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
610    /// `Self::metadata` is missing the page index, this function will attempt
611    /// to load the page index by making an object store request.
612    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
613        let metadata = ParquetMetaDataReader::new()
614            .with_page_index_policy(options.page_index_policy)
615            .with_metadata_options(Some(options.metadata_options.clone()));
616        #[cfg(feature = "encryption")]
617        let metadata = metadata.with_decryption_properties(
618            options.file_decryption_properties.as_ref().map(Arc::clone),
619        );
620        let metadata = metadata.parse_and_finish(reader)?;
621        Self::try_new(Arc::new(metadata), options)
622    }
623
624    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
625    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
626    ///
627    /// # Notes
628    ///
629    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
630    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
631    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
632        match options.supplied_schema {
633            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
634            None => {
635                let kv_metadata = match options.skip_arrow_metadata {
636                    true => None,
637                    false => metadata.file_metadata().key_value_metadata(),
638                };
639
640                let (schema, fields) = parquet_to_arrow_schema_and_fields(
641                    metadata.file_metadata().schema_descr(),
642                    ProjectionMask::all(),
643                    kv_metadata,
644                )?;
645
646                Ok(Self {
647                    metadata,
648                    schema: Arc::new(schema),
649                    fields: fields.map(Arc::new),
650                })
651            }
652        }
653    }
654
655    fn with_supplied_schema(
656        metadata: Arc<ParquetMetaData>,
657        supplied_schema: SchemaRef,
658    ) -> Result<Self> {
659        let parquet_schema = metadata.file_metadata().schema_descr();
660        let field_levels = parquet_to_arrow_field_levels(
661            parquet_schema,
662            ProjectionMask::all(),
663            Some(supplied_schema.fields()),
664        )?;
665        let fields = field_levels.fields;
666        let inferred_len = fields.len();
667        let supplied_len = supplied_schema.fields().len();
668        // Ensure the supplied schema has the same number of columns as the parquet schema.
669        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
670        // different lengths, but we check here to be safe.
671        if inferred_len != supplied_len {
672            return Err(arrow_err!(format!(
673                "Incompatible supplied Arrow schema: expected {} columns received {}",
674                inferred_len, supplied_len
675            )));
676        }
677
678        let mut errors = Vec::new();
679
680        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
681
682        for (field1, field2) in field_iter {
683            if field1.data_type() != field2.data_type() {
684                errors.push(format!(
685                    "data type mismatch for field {}: requested {} but found {}",
686                    field1.name(),
687                    field1.data_type(),
688                    field2.data_type()
689                ));
690            }
691            if field1.is_nullable() != field2.is_nullable() {
692                errors.push(format!(
693                    "nullability mismatch for field {}: expected {:?} but found {:?}",
694                    field1.name(),
695                    field1.is_nullable(),
696                    field2.is_nullable()
697                ));
698            }
699            if field1.metadata() != field2.metadata() {
700                errors.push(format!(
701                    "metadata mismatch for field {}: expected {:?} but found {:?}",
702                    field1.name(),
703                    field1.metadata(),
704                    field2.metadata()
705                ));
706            }
707        }
708
709        if !errors.is_empty() {
710            let message = errors.join(", ");
711            return Err(ParquetError::ArrowError(format!(
712                "Incompatible supplied Arrow schema: {message}",
713            )));
714        }
715
716        Ok(Self {
717            metadata,
718            schema: supplied_schema,
719            fields: field_levels.levels.map(Arc::new),
720        })
721    }
722
723    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
724    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
725        &self.metadata
726    }
727
728    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
729    pub fn parquet_schema(&self) -> &SchemaDescriptor {
730        self.metadata.file_metadata().schema_descr()
731    }
732
733    /// Returns the arrow [`SchemaRef`] for this parquet file
734    pub fn schema(&self) -> &SchemaRef {
735        &self.schema
736    }
737}
738
739#[doc(hidden)]
740// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
741pub struct SyncReader<T: ChunkReader>(T);
742
743impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
744    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
745        f.debug_tuple("SyncReader").field(&self.0).finish()
746    }
747}
748
749/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
750///
751/// # See Also
752/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
753/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
754/// * [`ArrowReaderBuilder`] for additional member functions
755pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
756
757impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
758    /// Create a new [`ParquetRecordBatchReaderBuilder`]
759    ///
760    /// ```
761    /// # use std::sync::Arc;
762    /// # use bytes::Bytes;
763    /// # use arrow_array::{Int32Array, RecordBatch};
764    /// # use arrow_schema::{DataType, Field, Schema};
765    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
766    /// # use parquet::arrow::ArrowWriter;
767    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
768    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
769    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
770    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
771    /// # writer.write(&batch).unwrap();
772    /// # writer.close().unwrap();
773    /// # let file = Bytes::from(file);
774    /// #
775    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
776    ///
777    /// // Inspect metadata
778    /// assert_eq!(builder.metadata().num_row_groups(), 1);
779    ///
780    /// // Construct reader
781    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
782    ///
783    /// // Read data
784    /// let _batch = reader.next().unwrap().unwrap();
785    /// ```
786    pub fn try_new(reader: T) -> Result<Self> {
787        Self::try_new_with_options(reader, Default::default())
788    }
789
790    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
791    ///
792    /// Use this method if you want to control the options for reading the
793    /// [`ParquetMetaData`]
794    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
795        let metadata = ArrowReaderMetadata::load(&reader, options)?;
796        Ok(Self::new_with_metadata(reader, metadata))
797    }
798
799    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
800    ///
801    /// Use this method if you already have [`ParquetMetaData`] for a file.
802    /// This interface allows:
803    ///
804    /// 1. Loading metadata once and using it to create multiple builders with
805    ///    potentially different settings or run on different threads
806    ///
807    /// 2. Using a cached copy of the metadata rather than re-reading it from the
808    ///    file each time a reader is constructed.
809    ///
810    /// See the docs on [`ArrowReaderMetadata`] for more details
811    ///
812    /// # Example
813    /// ```
814    /// # use std::fs::metadata;
815    /// # use std::sync::Arc;
816    /// # use bytes::Bytes;
817    /// # use arrow_array::{Int32Array, RecordBatch};
818    /// # use arrow_schema::{DataType, Field, Schema};
819    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
820    /// # use parquet::arrow::ArrowWriter;
821    /// #
822    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
823    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
824    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
825    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
826    /// # writer.write(&batch).unwrap();
827    /// # writer.close().unwrap();
828    /// # let file = Bytes::from(file);
829    /// #
830    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
831    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
832    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
833    ///
834    /// // Should be able to read from both in parallel
835    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
836    /// ```
837    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
838        Self::new_builder(SyncReader(input), metadata)
839    }
840
841    /// Read bloom filter for a column in a row group
842    ///
843    /// Returns `None` if the column does not have a bloom filter
844    ///
845    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
846    pub fn get_row_group_column_bloom_filter(
847        &self,
848        row_group_idx: usize,
849        column_idx: usize,
850    ) -> Result<Option<Sbbf>> {
851        let metadata = self.metadata.row_group(row_group_idx);
852        let column_metadata = metadata.column(column_idx);
853
854        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
855            offset
856                .try_into()
857                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
858        } else {
859            return Ok(None);
860        };
861
862        let buffer = match column_metadata.bloom_filter_length() {
863            Some(length) => self.input.0.get_bytes(offset, length as usize),
864            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
865        }?;
866
867        let (header, bitset_offset) =
868            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
869
870        match header.algorithm {
871            BloomFilterAlgorithm::BLOCK => {
872                // this match exists to future proof the singleton algorithm enum
873            }
874        }
875        match header.compression {
876            BloomFilterCompression::UNCOMPRESSED => {
877                // this match exists to future proof the singleton compression enum
878            }
879        }
880        match header.hash {
881            BloomFilterHash::XXHASH => {
882                // this match exists to future proof the singleton hash enum
883            }
884        }
885
886        let bitset = match column_metadata.bloom_filter_length() {
887            Some(_) => buffer.slice(
888                (TryInto::<usize>::try_into(bitset_offset).unwrap()
889                    - TryInto::<usize>::try_into(offset).unwrap())..,
890            ),
891            None => {
892                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
893                    ParquetError::General("Bloom filter length is invalid".to_string())
894                })?;
895                self.input.0.get_bytes(bitset_offset, bitset_length)?
896            }
897        };
898        Ok(Some(Sbbf::new(&bitset)))
899    }
900
901    /// Build a [`ParquetRecordBatchReader`]
902    ///
903    /// Note: this will eagerly evaluate any `RowFilter` before returning
904    pub fn build(self) -> Result<ParquetRecordBatchReader> {
905        let Self {
906            input,
907            metadata,
908            schema: _,
909            fields,
910            batch_size: _,
911            row_groups,
912            projection,
913            mut filter,
914            selection,
915            limit,
916            offset,
917            metrics,
918            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
919            max_predicate_cache_size: _,
920        } = self;
921
922        // Try to avoid allocate large buffer
923        let batch_size = self
924            .batch_size
925            .min(metadata.file_metadata().num_rows() as usize);
926
927        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
928
929        let reader = ReaderRowGroups {
930            reader: Arc::new(input.0),
931            metadata,
932            row_groups,
933        };
934
935        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
936
937        // Update selection based on any filters
938        if let Some(filter) = filter.as_mut() {
939            for predicate in filter.predicates.iter_mut() {
940                // break early if we have ruled out all rows
941                if !plan_builder.selects_any() {
942                    break;
943                }
944
945                let mut cache_projection = predicate.projection().clone();
946                cache_projection.intersect(&projection);
947
948                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
949                    .build_array_reader(fields.as_deref(), predicate.projection())?;
950
951                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
952            }
953        }
954
955        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
956            .build_array_reader(fields.as_deref(), &projection)?;
957
958        let read_plan = plan_builder
959            .limited(reader.num_rows())
960            .with_offset(offset)
961            .with_limit(limit)
962            .build_limited()
963            .build();
964
965        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
966    }
967}
968
969struct ReaderRowGroups<T: ChunkReader> {
970    reader: Arc<T>,
971
972    metadata: Arc<ParquetMetaData>,
973    /// Optional list of row group indices to scan
974    row_groups: Vec<usize>,
975}
976
977impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
978    fn num_rows(&self) -> usize {
979        let meta = self.metadata.row_groups();
980        self.row_groups
981            .iter()
982            .map(|x| meta[*x].num_rows() as usize)
983            .sum()
984    }
985
986    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
987        Ok(Box::new(ReaderPageIterator {
988            column_idx: i,
989            reader: self.reader.clone(),
990            metadata: self.metadata.clone(),
991            row_groups: self.row_groups.clone().into_iter(),
992        }))
993    }
994}
995
996struct ReaderPageIterator<T: ChunkReader> {
997    reader: Arc<T>,
998    column_idx: usize,
999    row_groups: std::vec::IntoIter<usize>,
1000    metadata: Arc<ParquetMetaData>,
1001}
1002
1003impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1004    /// Return the next SerializedPageReader
1005    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1006        let rg = self.metadata.row_group(rg_idx);
1007        let column_chunk_metadata = rg.column(self.column_idx);
1008        let offset_index = self.metadata.offset_index();
1009        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1010        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1011        let page_locations = offset_index
1012            .filter(|i| !i[rg_idx].is_empty())
1013            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1014        let total_rows = rg.num_rows() as usize;
1015        let reader = self.reader.clone();
1016
1017        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1018            .add_crypto_context(
1019                rg_idx,
1020                self.column_idx,
1021                self.metadata.as_ref(),
1022                column_chunk_metadata,
1023            )
1024    }
1025}
1026
1027impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1028    type Item = Result<Box<dyn PageReader>>;
1029
1030    fn next(&mut self) -> Option<Self::Item> {
1031        let rg_idx = self.row_groups.next()?;
1032        let page_reader = self
1033            .next_page_reader(rg_idx)
1034            .map(|page_reader| Box::new(page_reader) as _);
1035        Some(page_reader)
1036    }
1037}
1038
1039impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1040
1041/// Reads Parquet data as Arrow [`RecordBatch`]es
1042///
1043/// This struct implements the [`RecordBatchReader`] trait and is an
1044/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1045///
1046/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1047///
1048/// Created by [`ParquetRecordBatchReaderBuilder`]
1049///
1050/// [`Bytes`]: bytes::Bytes
1051pub struct ParquetRecordBatchReader {
1052    array_reader: Box<dyn ArrayReader>,
1053    schema: SchemaRef,
1054    read_plan: ReadPlan,
1055}
1056
1057impl Debug for ParquetRecordBatchReader {
1058    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1059        f.debug_struct("ParquetRecordBatchReader")
1060            .field("array_reader", &"...")
1061            .field("schema", &self.schema)
1062            .field("read_plan", &self.read_plan)
1063            .finish()
1064    }
1065}
1066
1067impl Iterator for ParquetRecordBatchReader {
1068    type Item = Result<RecordBatch, ArrowError>;
1069
1070    fn next(&mut self) -> Option<Self::Item> {
1071        self.next_inner()
1072            .map_err(|arrow_err| arrow_err.into())
1073            .transpose()
1074    }
1075}
1076
1077impl ParquetRecordBatchReader {
1078    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1079    /// has reached the end of the file.
1080    ///
1081    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1082    /// simplify error handling with `?`
1083    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1084        let mut read_records = 0;
1085        let batch_size = self.batch_size();
1086        match self.read_plan.selection_mut() {
1087            Some(selection) => {
1088                while read_records < batch_size && !selection.is_empty() {
1089                    let front = selection.pop_front().unwrap();
1090                    if front.skip {
1091                        let skipped = self.array_reader.skip_records(front.row_count)?;
1092
1093                        if skipped != front.row_count {
1094                            return Err(general_err!(
1095                                "failed to skip rows, expected {}, got {}",
1096                                front.row_count,
1097                                skipped
1098                            ));
1099                        }
1100                        continue;
1101                    }
1102
1103                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1104                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1105                    if front.row_count == 0 {
1106                        continue;
1107                    }
1108
1109                    // try to read record
1110                    let need_read = batch_size - read_records;
1111                    let to_read = match front.row_count.checked_sub(need_read) {
1112                        Some(remaining) if remaining != 0 => {
1113                            // if page row count less than batch_size we must set batch size to page row count.
1114                            // add check avoid dead loop
1115                            selection.push_front(RowSelector::select(remaining));
1116                            need_read
1117                        }
1118                        _ => front.row_count,
1119                    };
1120                    match self.array_reader.read_records(to_read)? {
1121                        0 => break,
1122                        rec => read_records += rec,
1123                    };
1124                }
1125            }
1126            None => {
1127                self.array_reader.read_records(batch_size)?;
1128            }
1129        };
1130
1131        let array = self.array_reader.consume_batch()?;
1132        let struct_array = array.as_struct_opt().ok_or_else(|| {
1133            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1134        })?;
1135
1136        Ok(if struct_array.len() > 0 {
1137            Some(RecordBatch::from(struct_array))
1138        } else {
1139            None
1140        })
1141    }
1142}
1143
1144impl RecordBatchReader for ParquetRecordBatchReader {
1145    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1146    ///
1147    /// Note that the schema metadata will be stripped here. See
1148    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1149    fn schema(&self) -> SchemaRef {
1150        self.schema.clone()
1151    }
1152}
1153
1154impl ParquetRecordBatchReader {
1155    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1156    ///
1157    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1158    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1159        ParquetRecordBatchReaderBuilder::try_new(reader)?
1160            .with_batch_size(batch_size)
1161            .build()
1162    }
1163
1164    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1165    ///
1166    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1167    /// higher-level interface for reading parquet data from a file
1168    pub fn try_new_with_row_groups(
1169        levels: &FieldLevels,
1170        row_groups: &dyn RowGroups,
1171        batch_size: usize,
1172        selection: Option<RowSelection>,
1173    ) -> Result<Self> {
1174        // note metrics are not supported in this API
1175        let metrics = ArrowReaderMetrics::disabled();
1176        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1177            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1178
1179        let read_plan = ReadPlanBuilder::new(batch_size)
1180            .with_selection(selection)
1181            .build();
1182
1183        Ok(Self {
1184            array_reader,
1185            schema: Arc::new(Schema::new(levels.fields.clone())),
1186            read_plan,
1187        })
1188    }
1189
1190    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1191    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1192    /// all rows will be returned
1193    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1194        let schema = match array_reader.get_data_type() {
1195            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1196            _ => unreachable!("Struct array reader's data type is not struct!"),
1197        };
1198
1199        Self {
1200            array_reader,
1201            schema: Arc::new(schema),
1202            read_plan,
1203        }
1204    }
1205
1206    #[inline(always)]
1207    pub(crate) fn batch_size(&self) -> usize {
1208        self.read_plan.batch_size()
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use std::cmp::min;
1215    use std::collections::{HashMap, VecDeque};
1216    use std::fmt::Formatter;
1217    use std::fs::File;
1218    use std::io::Seek;
1219    use std::path::PathBuf;
1220    use std::sync::Arc;
1221
1222    use arrow_array::builder::*;
1223    use arrow_array::cast::AsArray;
1224    use arrow_array::types::{
1225        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1226        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1227        Time64MicrosecondType,
1228    };
1229    use arrow_array::*;
1230    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1231    use arrow_data::{ArrayData, ArrayDataBuilder};
1232    use arrow_schema::{
1233        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1234    };
1235    use arrow_select::concat::concat_batches;
1236    use bytes::Bytes;
1237    use half::f16;
1238    use num_traits::PrimInt;
1239    use rand::{Rng, RngCore, rng};
1240    use tempfile::tempfile;
1241
1242    use crate::arrow::arrow_reader::{
1243        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1244        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1245    };
1246    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1247    use crate::arrow::{ArrowWriter, ProjectionMask};
1248    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1249    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1250    use crate::data_type::{
1251        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1252        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1253    };
1254    use crate::errors::Result;
1255    use crate::file::metadata::ParquetMetaData;
1256    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1257    use crate::file::writer::SerializedFileWriter;
1258    use crate::schema::parser::parse_message_type;
1259    use crate::schema::types::{Type, TypePtr};
1260    use crate::util::test_common::rand_gen::RandGen;
1261
1262    #[test]
1263    fn test_arrow_reader_all_columns() {
1264        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1265
1266        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1267        let original_schema = Arc::clone(builder.schema());
1268        let reader = builder.build().unwrap();
1269
1270        // Verify that the schema was correctly parsed
1271        assert_eq!(original_schema.fields(), reader.schema().fields());
1272    }
1273
1274    #[test]
1275    fn test_reuse_schema() {
1276        let file = get_test_file("parquet/alltypes-java.parquet");
1277
1278        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1279        let expected = builder.metadata;
1280        let schema = expected.file_metadata().schema_descr_ptr();
1281
1282        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1283        let builder =
1284            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1285
1286        // Verify that the metadata matches
1287        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1288    }
1289
1290    #[test]
1291    fn test_arrow_reader_single_column() {
1292        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1293
1294        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1295        let original_schema = Arc::clone(builder.schema());
1296
1297        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1298        let reader = builder.with_projection(mask).build().unwrap();
1299
1300        // Verify that the schema was correctly parsed
1301        assert_eq!(1, reader.schema().fields().len());
1302        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1303    }
1304
1305    #[test]
1306    fn test_arrow_reader_single_column_by_name() {
1307        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1308
1309        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1310        let original_schema = Arc::clone(builder.schema());
1311
1312        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1313        let reader = builder.with_projection(mask).build().unwrap();
1314
1315        // Verify that the schema was correctly parsed
1316        assert_eq!(1, reader.schema().fields().len());
1317        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1318    }
1319
1320    #[test]
1321    fn test_null_column_reader_test() {
1322        let mut file = tempfile::tempfile().unwrap();
1323
1324        let schema = "
1325            message message {
1326                OPTIONAL INT32 int32;
1327            }
1328        ";
1329        let schema = Arc::new(parse_message_type(schema).unwrap());
1330
1331        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1332        generate_single_column_file_with_data::<Int32Type>(
1333            &[vec![], vec![]],
1334            Some(&def_levels),
1335            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1336            schema,
1337            Some(Field::new("int32", ArrowDataType::Null, true)),
1338            &Default::default(),
1339        )
1340        .unwrap();
1341
1342        file.rewind().unwrap();
1343
1344        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1345        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1346
1347        assert_eq!(batches.len(), 4);
1348        for batch in &batches[0..3] {
1349            assert_eq!(batch.num_rows(), 2);
1350            assert_eq!(batch.num_columns(), 1);
1351            assert_eq!(batch.column(0).null_count(), 2);
1352        }
1353
1354        assert_eq!(batches[3].num_rows(), 1);
1355        assert_eq!(batches[3].num_columns(), 1);
1356        assert_eq!(batches[3].column(0).null_count(), 1);
1357    }
1358
1359    #[test]
1360    fn test_primitive_single_column_reader_test() {
1361        run_single_column_reader_tests::<BoolType, _, BoolType>(
1362            2,
1363            ConvertedType::NONE,
1364            None,
1365            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1366            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1367        );
1368        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1369            2,
1370            ConvertedType::NONE,
1371            None,
1372            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1373            &[
1374                Encoding::PLAIN,
1375                Encoding::RLE_DICTIONARY,
1376                Encoding::DELTA_BINARY_PACKED,
1377                Encoding::BYTE_STREAM_SPLIT,
1378            ],
1379        );
1380        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1381            2,
1382            ConvertedType::NONE,
1383            None,
1384            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1385            &[
1386                Encoding::PLAIN,
1387                Encoding::RLE_DICTIONARY,
1388                Encoding::DELTA_BINARY_PACKED,
1389                Encoding::BYTE_STREAM_SPLIT,
1390            ],
1391        );
1392        run_single_column_reader_tests::<FloatType, _, FloatType>(
1393            2,
1394            ConvertedType::NONE,
1395            None,
1396            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1397            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1398        );
1399    }
1400
1401    #[test]
1402    fn test_unsigned_primitive_single_column_reader_test() {
1403        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1404            2,
1405            ConvertedType::UINT_32,
1406            Some(ArrowDataType::UInt32),
1407            |vals| {
1408                Arc::new(UInt32Array::from_iter(
1409                    vals.iter().map(|x| x.map(|x| x as u32)),
1410                ))
1411            },
1412            &[
1413                Encoding::PLAIN,
1414                Encoding::RLE_DICTIONARY,
1415                Encoding::DELTA_BINARY_PACKED,
1416            ],
1417        );
1418        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1419            2,
1420            ConvertedType::UINT_64,
1421            Some(ArrowDataType::UInt64),
1422            |vals| {
1423                Arc::new(UInt64Array::from_iter(
1424                    vals.iter().map(|x| x.map(|x| x as u64)),
1425                ))
1426            },
1427            &[
1428                Encoding::PLAIN,
1429                Encoding::RLE_DICTIONARY,
1430                Encoding::DELTA_BINARY_PACKED,
1431            ],
1432        );
1433    }
1434
1435    #[test]
1436    fn test_unsigned_roundtrip() {
1437        let schema = Arc::new(Schema::new(vec![
1438            Field::new("uint32", ArrowDataType::UInt32, true),
1439            Field::new("uint64", ArrowDataType::UInt64, true),
1440        ]));
1441
1442        let mut buf = Vec::with_capacity(1024);
1443        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1444
1445        let original = RecordBatch::try_new(
1446            schema,
1447            vec![
1448                Arc::new(UInt32Array::from_iter_values([
1449                    0,
1450                    i32::MAX as u32,
1451                    u32::MAX,
1452                ])),
1453                Arc::new(UInt64Array::from_iter_values([
1454                    0,
1455                    i64::MAX as u64,
1456                    u64::MAX,
1457                ])),
1458            ],
1459        )
1460        .unwrap();
1461
1462        writer.write(&original).unwrap();
1463        writer.close().unwrap();
1464
1465        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1466        let ret = reader.next().unwrap().unwrap();
1467        assert_eq!(ret, original);
1468
1469        // Check they can be downcast to the correct type
1470        ret.column(0)
1471            .as_any()
1472            .downcast_ref::<UInt32Array>()
1473            .unwrap();
1474
1475        ret.column(1)
1476            .as_any()
1477            .downcast_ref::<UInt64Array>()
1478            .unwrap();
1479    }
1480
1481    #[test]
1482    fn test_float16_roundtrip() -> Result<()> {
1483        let schema = Arc::new(Schema::new(vec![
1484            Field::new("float16", ArrowDataType::Float16, false),
1485            Field::new("float16-nullable", ArrowDataType::Float16, true),
1486        ]));
1487
1488        let mut buf = Vec::with_capacity(1024);
1489        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1490
1491        let original = RecordBatch::try_new(
1492            schema,
1493            vec![
1494                Arc::new(Float16Array::from_iter_values([
1495                    f16::EPSILON,
1496                    f16::MIN,
1497                    f16::MAX,
1498                    f16::NAN,
1499                    f16::INFINITY,
1500                    f16::NEG_INFINITY,
1501                    f16::ONE,
1502                    f16::NEG_ONE,
1503                    f16::ZERO,
1504                    f16::NEG_ZERO,
1505                    f16::E,
1506                    f16::PI,
1507                    f16::FRAC_1_PI,
1508                ])),
1509                Arc::new(Float16Array::from(vec![
1510                    None,
1511                    None,
1512                    None,
1513                    Some(f16::NAN),
1514                    Some(f16::INFINITY),
1515                    Some(f16::NEG_INFINITY),
1516                    None,
1517                    None,
1518                    None,
1519                    None,
1520                    None,
1521                    None,
1522                    Some(f16::FRAC_1_PI),
1523                ])),
1524            ],
1525        )?;
1526
1527        writer.write(&original)?;
1528        writer.close()?;
1529
1530        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1531        let ret = reader.next().unwrap()?;
1532        assert_eq!(ret, original);
1533
1534        // Ensure can be downcast to the correct type
1535        ret.column(0).as_primitive::<Float16Type>();
1536        ret.column(1).as_primitive::<Float16Type>();
1537
1538        Ok(())
1539    }
1540
1541    #[test]
1542    fn test_time_utc_roundtrip() -> Result<()> {
1543        let schema = Arc::new(Schema::new(vec![
1544            Field::new(
1545                "time_millis",
1546                ArrowDataType::Time32(TimeUnit::Millisecond),
1547                true,
1548            )
1549            .with_metadata(HashMap::from_iter(vec![(
1550                "adjusted_to_utc".to_string(),
1551                "".to_string(),
1552            )])),
1553            Field::new(
1554                "time_micros",
1555                ArrowDataType::Time64(TimeUnit::Microsecond),
1556                true,
1557            )
1558            .with_metadata(HashMap::from_iter(vec![(
1559                "adjusted_to_utc".to_string(),
1560                "".to_string(),
1561            )])),
1562        ]));
1563
1564        let mut buf = Vec::with_capacity(1024);
1565        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1566
1567        let original = RecordBatch::try_new(
1568            schema,
1569            vec![
1570                Arc::new(Time32MillisecondArray::from(vec![
1571                    Some(-1),
1572                    Some(0),
1573                    Some(86_399_000),
1574                    Some(86_400_000),
1575                    Some(86_401_000),
1576                    None,
1577                ])),
1578                Arc::new(Time64MicrosecondArray::from(vec![
1579                    Some(-1),
1580                    Some(0),
1581                    Some(86_399 * 1_000_000),
1582                    Some(86_400 * 1_000_000),
1583                    Some(86_401 * 1_000_000),
1584                    None,
1585                ])),
1586            ],
1587        )?;
1588
1589        writer.write(&original)?;
1590        writer.close()?;
1591
1592        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1593        let ret = reader.next().unwrap()?;
1594        assert_eq!(ret, original);
1595
1596        // Ensure can be downcast to the correct type
1597        ret.column(0).as_primitive::<Time32MillisecondType>();
1598        ret.column(1).as_primitive::<Time64MicrosecondType>();
1599
1600        Ok(())
1601    }
1602
1603    #[test]
1604    fn test_date32_roundtrip() -> Result<()> {
1605        use arrow_array::Date32Array;
1606
1607        let schema = Arc::new(Schema::new(vec![Field::new(
1608            "date32",
1609            ArrowDataType::Date32,
1610            false,
1611        )]));
1612
1613        let mut buf = Vec::with_capacity(1024);
1614
1615        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1616
1617        let original = RecordBatch::try_new(
1618            schema,
1619            vec![Arc::new(Date32Array::from(vec![
1620                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1621            ]))],
1622        )?;
1623
1624        writer.write(&original)?;
1625        writer.close()?;
1626
1627        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1628        let ret = reader.next().unwrap()?;
1629        assert_eq!(ret, original);
1630
1631        // Ensure can be downcast to the correct type
1632        ret.column(0).as_primitive::<Date32Type>();
1633
1634        Ok(())
1635    }
1636
1637    #[test]
1638    fn test_date64_roundtrip() -> Result<()> {
1639        use arrow_array::Date64Array;
1640
1641        let schema = Arc::new(Schema::new(vec![
1642            Field::new("small-date64", ArrowDataType::Date64, false),
1643            Field::new("big-date64", ArrowDataType::Date64, false),
1644            Field::new("invalid-date64", ArrowDataType::Date64, false),
1645        ]));
1646
1647        let mut default_buf = Vec::with_capacity(1024);
1648        let mut coerce_buf = Vec::with_capacity(1024);
1649
1650        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1651
1652        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1653        let mut coerce_writer =
1654            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1655
1656        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1657
1658        let original = RecordBatch::try_new(
1659            schema,
1660            vec![
1661                // small-date64
1662                Arc::new(Date64Array::from(vec![
1663                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1664                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1665                    0,
1666                    1_000 * NUM_MILLISECONDS_IN_DAY,
1667                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1668                ])),
1669                // big-date64
1670                Arc::new(Date64Array::from(vec![
1671                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1672                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1673                    0,
1674                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1675                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1676                ])),
1677                // invalid-date64
1678                Arc::new(Date64Array::from(vec![
1679                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1680                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1681                    1,
1682                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1683                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1684                ])),
1685            ],
1686        )?;
1687
1688        default_writer.write(&original)?;
1689        coerce_writer.write(&original)?;
1690
1691        default_writer.close()?;
1692        coerce_writer.close()?;
1693
1694        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1695        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1696
1697        let default_ret = default_reader.next().unwrap()?;
1698        let coerce_ret = coerce_reader.next().unwrap()?;
1699
1700        // Roundtrip should be successful when default writer used
1701        assert_eq!(default_ret, original);
1702
1703        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1704        assert_eq!(coerce_ret.column(0), original.column(0));
1705        assert_ne!(coerce_ret.column(1), original.column(1));
1706        assert_ne!(coerce_ret.column(2), original.column(2));
1707
1708        // Ensure both can be downcast to the correct type
1709        default_ret.column(0).as_primitive::<Date64Type>();
1710        coerce_ret.column(0).as_primitive::<Date64Type>();
1711
1712        Ok(())
1713    }
1714    struct RandFixedLenGen {}
1715
1716    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1717        fn r#gen(len: i32) -> FixedLenByteArray {
1718            let mut v = vec![0u8; len as usize];
1719            rng().fill_bytes(&mut v);
1720            ByteArray::from(v).into()
1721        }
1722    }
1723
1724    #[test]
1725    fn test_fixed_length_binary_column_reader() {
1726        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1727            20,
1728            ConvertedType::NONE,
1729            None,
1730            |vals| {
1731                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1732                for val in vals {
1733                    match val {
1734                        Some(b) => builder.append_value(b).unwrap(),
1735                        None => builder.append_null(),
1736                    }
1737                }
1738                Arc::new(builder.finish())
1739            },
1740            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1741        );
1742    }
1743
1744    #[test]
1745    fn test_interval_day_time_column_reader() {
1746        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1747            12,
1748            ConvertedType::INTERVAL,
1749            None,
1750            |vals| {
1751                Arc::new(
1752                    vals.iter()
1753                        .map(|x| {
1754                            x.as_ref().map(|b| IntervalDayTime {
1755                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1756                                milliseconds: i32::from_le_bytes(
1757                                    b.as_ref()[8..12].try_into().unwrap(),
1758                                ),
1759                            })
1760                        })
1761                        .collect::<IntervalDayTimeArray>(),
1762                )
1763            },
1764            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1765        );
1766    }
1767
1768    #[test]
1769    fn test_int96_single_column_reader_test() {
1770        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1771
1772        type TypeHintAndConversionFunction =
1773            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1774
1775        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1776            // Test without a specified ArrowType hint.
1777            (None, |vals: &[Option<Int96>]| {
1778                Arc::new(TimestampNanosecondArray::from_iter(
1779                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1780                )) as ArrayRef
1781            }),
1782            // Test other TimeUnits as ArrowType hints.
1783            (
1784                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1785                |vals: &[Option<Int96>]| {
1786                    Arc::new(TimestampSecondArray::from_iter(
1787                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1788                    )) as ArrayRef
1789                },
1790            ),
1791            (
1792                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1793                |vals: &[Option<Int96>]| {
1794                    Arc::new(TimestampMillisecondArray::from_iter(
1795                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1796                    )) as ArrayRef
1797                },
1798            ),
1799            (
1800                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1801                |vals: &[Option<Int96>]| {
1802                    Arc::new(TimestampMicrosecondArray::from_iter(
1803                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1804                    )) as ArrayRef
1805                },
1806            ),
1807            (
1808                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1809                |vals: &[Option<Int96>]| {
1810                    Arc::new(TimestampNanosecondArray::from_iter(
1811                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1812                    )) as ArrayRef
1813                },
1814            ),
1815            // Test another timezone with TimeUnit as ArrowType hints.
1816            (
1817                Some(ArrowDataType::Timestamp(
1818                    TimeUnit::Second,
1819                    Some(Arc::from("-05:00")),
1820                )),
1821                |vals: &[Option<Int96>]| {
1822                    Arc::new(
1823                        TimestampSecondArray::from_iter(
1824                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1825                        )
1826                        .with_timezone("-05:00"),
1827                    ) as ArrayRef
1828                },
1829            ),
1830        ];
1831
1832        resolutions.iter().for_each(|(arrow_type, converter)| {
1833            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1834                2,
1835                ConvertedType::NONE,
1836                arrow_type.clone(),
1837                converter,
1838                encodings,
1839            );
1840        })
1841    }
1842
1843    #[test]
1844    fn test_int96_from_spark_file_with_provided_schema() {
1845        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1846        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1847        // microsecond resolution for the Parquet reader to interpret these values correctly.
1848        use arrow_schema::DataType::Timestamp;
1849        let test_data = arrow::util::test_util::parquet_test_data();
1850        let path = format!("{test_data}/int96_from_spark.parquet");
1851        let file = File::open(path).unwrap();
1852
1853        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1854            "a",
1855            Timestamp(TimeUnit::Microsecond, None),
1856            true,
1857        )]));
1858        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1859
1860        let mut record_reader =
1861            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1862                .unwrap()
1863                .build()
1864                .unwrap();
1865
1866        let batch = record_reader.next().unwrap().unwrap();
1867        assert_eq!(batch.num_columns(), 1);
1868        let column = batch.column(0);
1869        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1870
1871        let expected = Arc::new(Int64Array::from(vec![
1872            Some(1704141296123456),
1873            Some(1704070800000000),
1874            Some(253402225200000000),
1875            Some(1735599600000000),
1876            None,
1877            Some(9089380393200000000),
1878        ]));
1879
1880        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1881        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1882        // anyway, so this should be a zero-copy non-modifying cast.
1883
1884        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1885        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1886
1887        assert_eq!(casted_timestamps.len(), expected.len());
1888
1889        casted_timestamps
1890            .iter()
1891            .zip(expected.iter())
1892            .for_each(|(lhs, rhs)| {
1893                assert_eq!(lhs, rhs);
1894            });
1895    }
1896
1897    #[test]
1898    fn test_int96_from_spark_file_without_provided_schema() {
1899        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1900        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1901        // values when read as nanosecond resolution overflow and result in garbage values.
1902        use arrow_schema::DataType::Timestamp;
1903        let test_data = arrow::util::test_util::parquet_test_data();
1904        let path = format!("{test_data}/int96_from_spark.parquet");
1905        let file = File::open(path).unwrap();
1906
1907        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1908            .unwrap()
1909            .build()
1910            .unwrap();
1911
1912        let batch = record_reader.next().unwrap().unwrap();
1913        assert_eq!(batch.num_columns(), 1);
1914        let column = batch.column(0);
1915        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1916
1917        let expected = Arc::new(Int64Array::from(vec![
1918            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1919            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1920            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1921            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1922            None,
1923            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1924        ]));
1925
1926        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1927        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1928        // anyway, so this should be a zero-copy non-modifying cast.
1929
1930        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1931        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1932
1933        assert_eq!(casted_timestamps.len(), expected.len());
1934
1935        casted_timestamps
1936            .iter()
1937            .zip(expected.iter())
1938            .for_each(|(lhs, rhs)| {
1939                assert_eq!(lhs, rhs);
1940            });
1941    }
1942
1943    struct RandUtf8Gen {}
1944
1945    impl RandGen<ByteArrayType> for RandUtf8Gen {
1946        fn r#gen(len: i32) -> ByteArray {
1947            Int32Type::r#gen(len).to_string().as_str().into()
1948        }
1949    }
1950
1951    #[test]
1952    fn test_utf8_single_column_reader_test() {
1953        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1954            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1955                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1956            })))
1957        }
1958
1959        let encodings = &[
1960            Encoding::PLAIN,
1961            Encoding::RLE_DICTIONARY,
1962            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1963            Encoding::DELTA_BYTE_ARRAY,
1964        ];
1965
1966        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1967            2,
1968            ConvertedType::NONE,
1969            None,
1970            |vals| {
1971                Arc::new(BinaryArray::from_iter(
1972                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1973                ))
1974            },
1975            encodings,
1976        );
1977
1978        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1979            2,
1980            ConvertedType::UTF8,
1981            None,
1982            string_converter::<i32>,
1983            encodings,
1984        );
1985
1986        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1987            2,
1988            ConvertedType::UTF8,
1989            Some(ArrowDataType::Utf8),
1990            string_converter::<i32>,
1991            encodings,
1992        );
1993
1994        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1995            2,
1996            ConvertedType::UTF8,
1997            Some(ArrowDataType::LargeUtf8),
1998            string_converter::<i64>,
1999            encodings,
2000        );
2001
2002        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2003        for key in &small_key_types {
2004            for encoding in encodings {
2005                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2006                opts.encoding = *encoding;
2007
2008                let data_type =
2009                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2010
2011                // Cannot run full test suite as keys overflow, run small test instead
2012                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2013                    opts,
2014                    2,
2015                    ConvertedType::UTF8,
2016                    Some(data_type.clone()),
2017                    move |vals| {
2018                        let vals = string_converter::<i32>(vals);
2019                        arrow::compute::cast(&vals, &data_type).unwrap()
2020                    },
2021                );
2022            }
2023        }
2024
2025        let key_types = [
2026            ArrowDataType::Int16,
2027            ArrowDataType::UInt16,
2028            ArrowDataType::Int32,
2029            ArrowDataType::UInt32,
2030            ArrowDataType::Int64,
2031            ArrowDataType::UInt64,
2032        ];
2033
2034        for key in &key_types {
2035            let data_type =
2036                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2037
2038            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2039                2,
2040                ConvertedType::UTF8,
2041                Some(data_type.clone()),
2042                move |vals| {
2043                    let vals = string_converter::<i32>(vals);
2044                    arrow::compute::cast(&vals, &data_type).unwrap()
2045                },
2046                encodings,
2047            );
2048
2049            // https://github.com/apache/arrow-rs/issues/1179
2050            // let data_type = ArrowDataType::Dictionary(
2051            //     Box::new(key.clone()),
2052            //     Box::new(ArrowDataType::LargeUtf8),
2053            // );
2054            //
2055            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2056            //     2,
2057            //     ConvertedType::UTF8,
2058            //     Some(data_type.clone()),
2059            //     move |vals| {
2060            //         let vals = string_converter::<i64>(vals);
2061            //         arrow::compute::cast(&vals, &data_type).unwrap()
2062            //     },
2063            //     encodings,
2064            // );
2065        }
2066    }
2067
2068    #[test]
2069    fn test_decimal_nullable_struct() {
2070        let decimals = Decimal256Array::from_iter_values(
2071            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2072        );
2073
2074        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2075            "decimals",
2076            decimals.data_type().clone(),
2077            false,
2078        )])))
2079        .len(8)
2080        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2081        .child_data(vec![decimals.into_data()])
2082        .build()
2083        .unwrap();
2084
2085        let written =
2086            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2087                .unwrap();
2088
2089        let mut buffer = Vec::with_capacity(1024);
2090        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2091        writer.write(&written).unwrap();
2092        writer.close().unwrap();
2093
2094        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2095            .unwrap()
2096            .collect::<Result<Vec<_>, _>>()
2097            .unwrap();
2098
2099        assert_eq!(&written.slice(0, 3), &read[0]);
2100        assert_eq!(&written.slice(3, 3), &read[1]);
2101        assert_eq!(&written.slice(6, 2), &read[2]);
2102    }
2103
2104    #[test]
2105    fn test_int32_nullable_struct() {
2106        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2107        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2108            "int32",
2109            int32.data_type().clone(),
2110            false,
2111        )])))
2112        .len(8)
2113        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2114        .child_data(vec![int32.into_data()])
2115        .build()
2116        .unwrap();
2117
2118        let written =
2119            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2120                .unwrap();
2121
2122        let mut buffer = Vec::with_capacity(1024);
2123        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2124        writer.write(&written).unwrap();
2125        writer.close().unwrap();
2126
2127        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2128            .unwrap()
2129            .collect::<Result<Vec<_>, _>>()
2130            .unwrap();
2131
2132        assert_eq!(&written.slice(0, 3), &read[0]);
2133        assert_eq!(&written.slice(3, 3), &read[1]);
2134        assert_eq!(&written.slice(6, 2), &read[2]);
2135    }
2136
2137    #[test]
2138    fn test_decimal_list() {
2139        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2140
2141        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2142        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2143            decimals.data_type().clone(),
2144            false,
2145        ))))
2146        .len(7)
2147        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2148        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2149        .child_data(vec![decimals.into_data()])
2150        .build()
2151        .unwrap();
2152
2153        let written =
2154            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2155                .unwrap();
2156
2157        let mut buffer = Vec::with_capacity(1024);
2158        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2159        writer.write(&written).unwrap();
2160        writer.close().unwrap();
2161
2162        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2163            .unwrap()
2164            .collect::<Result<Vec<_>, _>>()
2165            .unwrap();
2166
2167        assert_eq!(&written.slice(0, 3), &read[0]);
2168        assert_eq!(&written.slice(3, 3), &read[1]);
2169        assert_eq!(&written.slice(6, 1), &read[2]);
2170    }
2171
2172    #[test]
2173    fn test_read_decimal_file() {
2174        use arrow_array::Decimal128Array;
2175        let testdata = arrow::util::test_util::parquet_test_data();
2176        let file_variants = vec![
2177            ("byte_array", 4),
2178            ("fixed_length", 25),
2179            ("int32", 4),
2180            ("int64", 10),
2181        ];
2182        for (prefix, target_precision) in file_variants {
2183            let path = format!("{testdata}/{prefix}_decimal.parquet");
2184            let file = File::open(path).unwrap();
2185            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2186
2187            let batch = record_reader.next().unwrap().unwrap();
2188            assert_eq!(batch.num_rows(), 24);
2189            let col = batch
2190                .column(0)
2191                .as_any()
2192                .downcast_ref::<Decimal128Array>()
2193                .unwrap();
2194
2195            let expected = 1..25;
2196
2197            assert_eq!(col.precision(), target_precision);
2198            assert_eq!(col.scale(), 2);
2199
2200            for (i, v) in expected.enumerate() {
2201                assert_eq!(col.value(i), v * 100_i128);
2202            }
2203        }
2204    }
2205
2206    #[test]
2207    fn test_read_float16_nonzeros_file() {
2208        use arrow_array::Float16Array;
2209        let testdata = arrow::util::test_util::parquet_test_data();
2210        // see https://github.com/apache/parquet-testing/pull/40
2211        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2212        let file = File::open(path).unwrap();
2213        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2214
2215        let batch = record_reader.next().unwrap().unwrap();
2216        assert_eq!(batch.num_rows(), 8);
2217        let col = batch
2218            .column(0)
2219            .as_any()
2220            .downcast_ref::<Float16Array>()
2221            .unwrap();
2222
2223        let f16_two = f16::ONE + f16::ONE;
2224
2225        assert_eq!(col.null_count(), 1);
2226        assert!(col.is_null(0));
2227        assert_eq!(col.value(1), f16::ONE);
2228        assert_eq!(col.value(2), -f16_two);
2229        assert!(col.value(3).is_nan());
2230        assert_eq!(col.value(4), f16::ZERO);
2231        assert!(col.value(4).is_sign_positive());
2232        assert_eq!(col.value(5), f16::NEG_ONE);
2233        assert_eq!(col.value(6), f16::NEG_ZERO);
2234        assert!(col.value(6).is_sign_negative());
2235        assert_eq!(col.value(7), f16_two);
2236    }
2237
2238    #[test]
2239    fn test_read_float16_zeros_file() {
2240        use arrow_array::Float16Array;
2241        let testdata = arrow::util::test_util::parquet_test_data();
2242        // see https://github.com/apache/parquet-testing/pull/40
2243        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2244        let file = File::open(path).unwrap();
2245        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2246
2247        let batch = record_reader.next().unwrap().unwrap();
2248        assert_eq!(batch.num_rows(), 3);
2249        let col = batch
2250            .column(0)
2251            .as_any()
2252            .downcast_ref::<Float16Array>()
2253            .unwrap();
2254
2255        assert_eq!(col.null_count(), 1);
2256        assert!(col.is_null(0));
2257        assert_eq!(col.value(1), f16::ZERO);
2258        assert!(col.value(1).is_sign_positive());
2259        assert!(col.value(2).is_nan());
2260    }
2261
2262    #[test]
2263    fn test_read_float32_float64_byte_stream_split() {
2264        let path = format!(
2265            "{}/byte_stream_split.zstd.parquet",
2266            arrow::util::test_util::parquet_test_data(),
2267        );
2268        let file = File::open(path).unwrap();
2269        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2270
2271        let mut row_count = 0;
2272        for batch in record_reader {
2273            let batch = batch.unwrap();
2274            row_count += batch.num_rows();
2275            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2276            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2277
2278            // This file contains floats from a standard normal distribution
2279            for &x in f32_col.values() {
2280                assert!(x > -10.0);
2281                assert!(x < 10.0);
2282            }
2283            for &x in f64_col.values() {
2284                assert!(x > -10.0);
2285                assert!(x < 10.0);
2286            }
2287        }
2288        assert_eq!(row_count, 300);
2289    }
2290
2291    #[test]
2292    fn test_read_extended_byte_stream_split() {
2293        let path = format!(
2294            "{}/byte_stream_split_extended.gzip.parquet",
2295            arrow::util::test_util::parquet_test_data(),
2296        );
2297        let file = File::open(path).unwrap();
2298        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2299
2300        let mut row_count = 0;
2301        for batch in record_reader {
2302            let batch = batch.unwrap();
2303            row_count += batch.num_rows();
2304
2305            // 0,1 are f16
2306            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2307            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2308            assert_eq!(f16_col.len(), f16_bss.len());
2309            f16_col
2310                .iter()
2311                .zip(f16_bss.iter())
2312                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2313
2314            // 2,3 are f32
2315            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2316            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2317            assert_eq!(f32_col.len(), f32_bss.len());
2318            f32_col
2319                .iter()
2320                .zip(f32_bss.iter())
2321                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2322
2323            // 4,5 are f64
2324            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2325            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2326            assert_eq!(f64_col.len(), f64_bss.len());
2327            f64_col
2328                .iter()
2329                .zip(f64_bss.iter())
2330                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2331
2332            // 6,7 are i32
2333            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2334            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2335            assert_eq!(i32_col.len(), i32_bss.len());
2336            i32_col
2337                .iter()
2338                .zip(i32_bss.iter())
2339                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2340
2341            // 8,9 are i64
2342            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2343            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2344            assert_eq!(i64_col.len(), i64_bss.len());
2345            i64_col
2346                .iter()
2347                .zip(i64_bss.iter())
2348                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2349
2350            // 10,11 are FLBA(5)
2351            let flba_col = batch.column(10).as_fixed_size_binary();
2352            let flba_bss = batch.column(11).as_fixed_size_binary();
2353            assert_eq!(flba_col.len(), flba_bss.len());
2354            flba_col
2355                .iter()
2356                .zip(flba_bss.iter())
2357                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2358
2359            // 12,13 are FLBA(4) (decimal(7,3))
2360            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2361            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2362            assert_eq!(dec_col.len(), dec_bss.len());
2363            dec_col
2364                .iter()
2365                .zip(dec_bss.iter())
2366                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2367        }
2368        assert_eq!(row_count, 200);
2369    }
2370
2371    #[test]
2372    fn test_read_incorrect_map_schema_file() {
2373        let testdata = arrow::util::test_util::parquet_test_data();
2374        // see https://github.com/apache/parquet-testing/pull/47
2375        let path = format!("{testdata}/incorrect_map_schema.parquet");
2376        let file = File::open(path).unwrap();
2377        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2378
2379        let batch = record_reader.next().unwrap().unwrap();
2380        assert_eq!(batch.num_rows(), 1);
2381
2382        let expected_schema = Schema::new(vec![Field::new(
2383            "my_map",
2384            ArrowDataType::Map(
2385                Arc::new(Field::new(
2386                    "key_value",
2387                    ArrowDataType::Struct(Fields::from(vec![
2388                        Field::new("key", ArrowDataType::Utf8, false),
2389                        Field::new("value", ArrowDataType::Utf8, true),
2390                    ])),
2391                    false,
2392                )),
2393                false,
2394            ),
2395            true,
2396        )]);
2397        assert_eq!(batch.schema().as_ref(), &expected_schema);
2398
2399        assert_eq!(batch.num_rows(), 1);
2400        assert_eq!(batch.column(0).null_count(), 0);
2401        assert_eq!(
2402            batch.column(0).as_map().keys().as_ref(),
2403            &StringArray::from(vec!["parent", "name"])
2404        );
2405        assert_eq!(
2406            batch.column(0).as_map().values().as_ref(),
2407            &StringArray::from(vec!["another", "report"])
2408        );
2409    }
2410
2411    #[test]
2412    fn test_read_dict_fixed_size_binary() {
2413        let schema = Arc::new(Schema::new(vec![Field::new(
2414            "a",
2415            ArrowDataType::Dictionary(
2416                Box::new(ArrowDataType::UInt8),
2417                Box::new(ArrowDataType::FixedSizeBinary(8)),
2418            ),
2419            true,
2420        )]));
2421        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2422        let values = FixedSizeBinaryArray::try_from_iter(
2423            vec![
2424                (0u8..8u8).collect::<Vec<u8>>(),
2425                (24u8..32u8).collect::<Vec<u8>>(),
2426            ]
2427            .into_iter(),
2428        )
2429        .unwrap();
2430        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2431        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2432
2433        let mut buffer = Vec::with_capacity(1024);
2434        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2435        writer.write(&batch).unwrap();
2436        writer.close().unwrap();
2437        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2438            .unwrap()
2439            .collect::<Result<Vec<_>, _>>()
2440            .unwrap();
2441
2442        assert_eq!(read.len(), 1);
2443        assert_eq!(&batch, &read[0])
2444    }
2445
2446    #[test]
2447    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2448        // the `StructArrayReader` will check the definition and repetition levels of the first
2449        // child column in the struct to determine nullability for the struct. If the first
2450        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2451        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2452        // buffers managed by this array reader.
2453
2454        let struct_fields = Fields::from(vec![
2455            Field::new(
2456                "city",
2457                ArrowDataType::Dictionary(
2458                    Box::new(ArrowDataType::UInt8),
2459                    Box::new(ArrowDataType::Utf8),
2460                ),
2461                true,
2462            ),
2463            Field::new("name", ArrowDataType::Utf8, true),
2464        ]);
2465        let schema = Arc::new(Schema::new(vec![Field::new(
2466            "items",
2467            ArrowDataType::Struct(struct_fields.clone()),
2468            true,
2469        )]));
2470
2471        let items_arr = StructArray::new(
2472            struct_fields,
2473            vec![
2474                Arc::new(DictionaryArray::new(
2475                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2476                    Arc::new(StringArray::from_iter_values(vec![
2477                        "quebec",
2478                        "fredericton",
2479                        "halifax",
2480                    ])),
2481                )),
2482                Arc::new(StringArray::from_iter_values(vec![
2483                    "albert", "terry", "lance", "", "tim",
2484                ])),
2485            ],
2486            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2487        );
2488
2489        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2490        let mut buffer = Vec::with_capacity(1024);
2491        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2492        writer.write(&batch).unwrap();
2493        writer.close().unwrap();
2494        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2495            .unwrap()
2496            .collect::<Result<Vec<_>, _>>()
2497            .unwrap();
2498
2499        assert_eq!(read.len(), 1);
2500        assert_eq!(&batch, &read[0])
2501    }
2502
2503    /// Parameters for single_column_reader_test
2504    #[derive(Clone)]
2505    struct TestOptions {
2506        /// Number of row group to write to parquet (row group size =
2507        /// num_row_groups / num_rows)
2508        num_row_groups: usize,
2509        /// Total number of rows per row group
2510        num_rows: usize,
2511        /// Size of batches to read back
2512        record_batch_size: usize,
2513        /// Percentage of nulls in column or None if required
2514        null_percent: Option<usize>,
2515        /// Set write batch size
2516        ///
2517        /// This is the number of rows that are written at once to a page and
2518        /// therefore acts as a bound on the page granularity of a row group
2519        write_batch_size: usize,
2520        /// Maximum size of page in bytes
2521        max_data_page_size: usize,
2522        /// Maximum size of dictionary page in bytes
2523        max_dict_page_size: usize,
2524        /// Writer version
2525        writer_version: WriterVersion,
2526        /// Enabled statistics
2527        enabled_statistics: EnabledStatistics,
2528        /// Encoding
2529        encoding: Encoding,
2530        /// row selections and total selected row count
2531        row_selections: Option<(RowSelection, usize)>,
2532        /// row filter
2533        row_filter: Option<Vec<bool>>,
2534        /// limit
2535        limit: Option<usize>,
2536        /// offset
2537        offset: Option<usize>,
2538    }
2539
2540    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2541    impl std::fmt::Debug for TestOptions {
2542        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2543            f.debug_struct("TestOptions")
2544                .field("num_row_groups", &self.num_row_groups)
2545                .field("num_rows", &self.num_rows)
2546                .field("record_batch_size", &self.record_batch_size)
2547                .field("null_percent", &self.null_percent)
2548                .field("write_batch_size", &self.write_batch_size)
2549                .field("max_data_page_size", &self.max_data_page_size)
2550                .field("max_dict_page_size", &self.max_dict_page_size)
2551                .field("writer_version", &self.writer_version)
2552                .field("enabled_statistics", &self.enabled_statistics)
2553                .field("encoding", &self.encoding)
2554                .field("row_selections", &self.row_selections.is_some())
2555                .field("row_filter", &self.row_filter.is_some())
2556                .field("limit", &self.limit)
2557                .field("offset", &self.offset)
2558                .finish()
2559        }
2560    }
2561
2562    impl Default for TestOptions {
2563        fn default() -> Self {
2564            Self {
2565                num_row_groups: 2,
2566                num_rows: 100,
2567                record_batch_size: 15,
2568                null_percent: None,
2569                write_batch_size: 64,
2570                max_data_page_size: 1024 * 1024,
2571                max_dict_page_size: 1024 * 1024,
2572                writer_version: WriterVersion::PARQUET_1_0,
2573                enabled_statistics: EnabledStatistics::Page,
2574                encoding: Encoding::PLAIN,
2575                row_selections: None,
2576                row_filter: None,
2577                limit: None,
2578                offset: None,
2579            }
2580        }
2581    }
2582
2583    impl TestOptions {
2584        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2585            Self {
2586                num_row_groups,
2587                num_rows,
2588                record_batch_size,
2589                ..Default::default()
2590            }
2591        }
2592
2593        fn with_null_percent(self, null_percent: usize) -> Self {
2594            Self {
2595                null_percent: Some(null_percent),
2596                ..self
2597            }
2598        }
2599
2600        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2601            Self {
2602                max_data_page_size,
2603                ..self
2604            }
2605        }
2606
2607        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2608            Self {
2609                max_dict_page_size,
2610                ..self
2611            }
2612        }
2613
2614        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2615            Self {
2616                enabled_statistics,
2617                ..self
2618            }
2619        }
2620
2621        fn with_row_selections(self) -> Self {
2622            assert!(self.row_filter.is_none(), "Must set row selection first");
2623
2624            let mut rng = rng();
2625            let step = rng.random_range(self.record_batch_size..self.num_rows);
2626            let row_selections = create_test_selection(
2627                step,
2628                self.num_row_groups * self.num_rows,
2629                rng.random::<bool>(),
2630            );
2631            Self {
2632                row_selections: Some(row_selections),
2633                ..self
2634            }
2635        }
2636
2637        fn with_row_filter(self) -> Self {
2638            let row_count = match &self.row_selections {
2639                Some((_, count)) => *count,
2640                None => self.num_row_groups * self.num_rows,
2641            };
2642
2643            let mut rng = rng();
2644            Self {
2645                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2646                ..self
2647            }
2648        }
2649
2650        fn with_limit(self, limit: usize) -> Self {
2651            Self {
2652                limit: Some(limit),
2653                ..self
2654            }
2655        }
2656
2657        fn with_offset(self, offset: usize) -> Self {
2658            Self {
2659                offset: Some(offset),
2660                ..self
2661            }
2662        }
2663
2664        fn writer_props(&self) -> WriterProperties {
2665            let builder = WriterProperties::builder()
2666                .set_data_page_size_limit(self.max_data_page_size)
2667                .set_write_batch_size(self.write_batch_size)
2668                .set_writer_version(self.writer_version)
2669                .set_statistics_enabled(self.enabled_statistics);
2670
2671            let builder = match self.encoding {
2672                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2673                    .set_dictionary_enabled(true)
2674                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2675                _ => builder
2676                    .set_dictionary_enabled(false)
2677                    .set_encoding(self.encoding),
2678            };
2679
2680            builder.build()
2681        }
2682    }
2683
2684    /// Create a parquet file and then read it using
2685    /// `ParquetFileArrowReader` using a standard set of parameters
2686    /// `opts`.
2687    ///
2688    /// `rand_max` represents the maximum size of value to pass to to
2689    /// value generator
2690    fn run_single_column_reader_tests<T, F, G>(
2691        rand_max: i32,
2692        converted_type: ConvertedType,
2693        arrow_type: Option<ArrowDataType>,
2694        converter: F,
2695        encodings: &[Encoding],
2696    ) where
2697        T: DataType,
2698        G: RandGen<T>,
2699        F: Fn(&[Option<T::T>]) -> ArrayRef,
2700    {
2701        let all_options = vec![
2702            // choose record_batch_batch (15) so batches cross row
2703            // group boundaries (50 rows in 2 row groups) cases.
2704            TestOptions::new(2, 100, 15),
2705            // choose record_batch_batch (5) so batches sometime fall
2706            // on row group boundaries and (25 rows in 3 row groups
2707            // --> row groups of 10, 10, and 5). Tests buffer
2708            // refilling edge cases.
2709            TestOptions::new(3, 25, 5),
2710            // Choose record_batch_size (25) so all batches fall
2711            // exactly on row group boundary (25). Tests buffer
2712            // refilling edge cases.
2713            TestOptions::new(4, 100, 25),
2714            // Set maximum page size so row groups have multiple pages
2715            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2716            // Set small dictionary page size to test dictionary fallback
2717            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2718            // Test optional but with no nulls
2719            TestOptions::new(2, 256, 127).with_null_percent(0),
2720            // Test optional with nulls
2721            TestOptions::new(2, 256, 93).with_null_percent(25),
2722            // Test with limit of 0
2723            TestOptions::new(4, 100, 25).with_limit(0),
2724            // Test with limit of 50
2725            TestOptions::new(4, 100, 25).with_limit(50),
2726            // Test with limit equal to number of rows
2727            TestOptions::new(4, 100, 25).with_limit(10),
2728            // Test with limit larger than number of rows
2729            TestOptions::new(4, 100, 25).with_limit(101),
2730            // Test with limit + offset equal to number of rows
2731            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2732            // Test with limit + offset equal to number of rows
2733            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2734            // Test with limit + offset larger than number of rows
2735            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2736            // Test with no page-level statistics
2737            TestOptions::new(2, 256, 91)
2738                .with_null_percent(25)
2739                .with_enabled_statistics(EnabledStatistics::Chunk),
2740            // Test with no statistics
2741            TestOptions::new(2, 256, 91)
2742                .with_null_percent(25)
2743                .with_enabled_statistics(EnabledStatistics::None),
2744            // Test with all null
2745            TestOptions::new(2, 128, 91)
2746                .with_null_percent(100)
2747                .with_enabled_statistics(EnabledStatistics::None),
2748            // Test skip
2749
2750            // choose record_batch_batch (15) so batches cross row
2751            // group boundaries (50 rows in 2 row groups) cases.
2752            TestOptions::new(2, 100, 15).with_row_selections(),
2753            // choose record_batch_batch (5) so batches sometime fall
2754            // on row group boundaries and (25 rows in 3 row groups
2755            // --> row groups of 10, 10, and 5). Tests buffer
2756            // refilling edge cases.
2757            TestOptions::new(3, 25, 5).with_row_selections(),
2758            // Choose record_batch_size (25) so all batches fall
2759            // exactly on row group boundary (25). Tests buffer
2760            // refilling edge cases.
2761            TestOptions::new(4, 100, 25).with_row_selections(),
2762            // Set maximum page size so row groups have multiple pages
2763            TestOptions::new(3, 256, 73)
2764                .with_max_data_page_size(128)
2765                .with_row_selections(),
2766            // Set small dictionary page size to test dictionary fallback
2767            TestOptions::new(3, 256, 57)
2768                .with_max_dict_page_size(128)
2769                .with_row_selections(),
2770            // Test optional but with no nulls
2771            TestOptions::new(2, 256, 127)
2772                .with_null_percent(0)
2773                .with_row_selections(),
2774            // Test optional with nulls
2775            TestOptions::new(2, 256, 93)
2776                .with_null_percent(25)
2777                .with_row_selections(),
2778            // Test optional with nulls
2779            TestOptions::new(2, 256, 93)
2780                .with_null_percent(25)
2781                .with_row_selections()
2782                .with_limit(10),
2783            // Test optional with nulls
2784            TestOptions::new(2, 256, 93)
2785                .with_null_percent(25)
2786                .with_row_selections()
2787                .with_offset(20)
2788                .with_limit(10),
2789            // Test filter
2790
2791            // Test with row filter
2792            TestOptions::new(4, 100, 25).with_row_filter(),
2793            // Test with row selection and row filter
2794            TestOptions::new(4, 100, 25)
2795                .with_row_selections()
2796                .with_row_filter(),
2797            // Test with nulls and row filter
2798            TestOptions::new(2, 256, 93)
2799                .with_null_percent(25)
2800                .with_max_data_page_size(10)
2801                .with_row_filter(),
2802            // Test with nulls and row filter and small pages
2803            TestOptions::new(2, 256, 93)
2804                .with_null_percent(25)
2805                .with_max_data_page_size(10)
2806                .with_row_selections()
2807                .with_row_filter(),
2808            // Test with row selection and no offset index and small pages
2809            TestOptions::new(2, 256, 93)
2810                .with_enabled_statistics(EnabledStatistics::None)
2811                .with_max_data_page_size(10)
2812                .with_row_selections(),
2813        ];
2814
2815        all_options.into_iter().for_each(|opts| {
2816            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2817                for encoding in encodings {
2818                    let opts = TestOptions {
2819                        writer_version,
2820                        encoding: *encoding,
2821                        ..opts.clone()
2822                    };
2823
2824                    single_column_reader_test::<T, _, G>(
2825                        opts,
2826                        rand_max,
2827                        converted_type,
2828                        arrow_type.clone(),
2829                        &converter,
2830                    )
2831                }
2832            }
2833        });
2834    }
2835
2836    /// Create a parquet file and then read it using
2837    /// `ParquetFileArrowReader` using the parameters described in
2838    /// `opts`.
2839    fn single_column_reader_test<T, F, G>(
2840        opts: TestOptions,
2841        rand_max: i32,
2842        converted_type: ConvertedType,
2843        arrow_type: Option<ArrowDataType>,
2844        converter: F,
2845    ) where
2846        T: DataType,
2847        G: RandGen<T>,
2848        F: Fn(&[Option<T::T>]) -> ArrayRef,
2849    {
2850        // Print out options to facilitate debugging failures on CI
2851        println!(
2852            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2853            T::get_physical_type(),
2854            converted_type,
2855            arrow_type,
2856            opts
2857        );
2858
2859        //according to null_percent generate def_levels
2860        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2861            Some(null_percent) => {
2862                let mut rng = rng();
2863
2864                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2865                    .map(|_| {
2866                        std::iter::from_fn(|| {
2867                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2868                        })
2869                        .take(opts.num_rows)
2870                        .collect()
2871                    })
2872                    .collect();
2873                (Repetition::OPTIONAL, Some(def_levels))
2874            }
2875            None => (Repetition::REQUIRED, None),
2876        };
2877
2878        //generate random table data
2879        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2880            .map(|idx| {
2881                let null_count = match def_levels.as_ref() {
2882                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2883                    None => 0,
2884                };
2885                G::gen_vec(rand_max, opts.num_rows - null_count)
2886            })
2887            .collect();
2888
2889        let len = match T::get_physical_type() {
2890            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2891            crate::basic::Type::INT96 => 12,
2892            _ => -1,
2893        };
2894
2895        let fields = vec![Arc::new(
2896            Type::primitive_type_builder("leaf", T::get_physical_type())
2897                .with_repetition(repetition)
2898                .with_converted_type(converted_type)
2899                .with_length(len)
2900                .build()
2901                .unwrap(),
2902        )];
2903
2904        let schema = Arc::new(
2905            Type::group_type_builder("test_schema")
2906                .with_fields(fields)
2907                .build()
2908                .unwrap(),
2909        );
2910
2911        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2912
2913        let mut file = tempfile::tempfile().unwrap();
2914
2915        generate_single_column_file_with_data::<T>(
2916            &values,
2917            def_levels.as_ref(),
2918            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2919            schema,
2920            arrow_field,
2921            &opts,
2922        )
2923        .unwrap();
2924
2925        file.rewind().unwrap();
2926
2927        let options = ArrowReaderOptions::new()
2928            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2929
2930        let mut builder =
2931            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2932
2933        let expected_data = match opts.row_selections {
2934            Some((selections, row_count)) => {
2935                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2936
2937                let mut skip_data: Vec<Option<T::T>> = vec![];
2938                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2939                for select in dequeue {
2940                    if select.skip {
2941                        without_skip_data.drain(0..select.row_count);
2942                    } else {
2943                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2944                    }
2945                }
2946                builder = builder.with_row_selection(selections);
2947
2948                assert_eq!(skip_data.len(), row_count);
2949                skip_data
2950            }
2951            None => {
2952                //get flatten table data
2953                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2954                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2955                expected_data
2956            }
2957        };
2958
2959        let mut expected_data = match opts.row_filter {
2960            Some(filter) => {
2961                let expected_data = expected_data
2962                    .into_iter()
2963                    .zip(filter.iter())
2964                    .filter_map(|(d, f)| f.then(|| d))
2965                    .collect();
2966
2967                let mut filter_offset = 0;
2968                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2969                    ProjectionMask::all(),
2970                    move |b| {
2971                        let array = BooleanArray::from_iter(
2972                            filter
2973                                .iter()
2974                                .skip(filter_offset)
2975                                .take(b.num_rows())
2976                                .map(|x| Some(*x)),
2977                        );
2978                        filter_offset += b.num_rows();
2979                        Ok(array)
2980                    },
2981                ))]);
2982
2983                builder = builder.with_row_filter(filter);
2984                expected_data
2985            }
2986            None => expected_data,
2987        };
2988
2989        if let Some(offset) = opts.offset {
2990            builder = builder.with_offset(offset);
2991            expected_data = expected_data.into_iter().skip(offset).collect();
2992        }
2993
2994        if let Some(limit) = opts.limit {
2995            builder = builder.with_limit(limit);
2996            expected_data = expected_data.into_iter().take(limit).collect();
2997        }
2998
2999        let mut record_reader = builder
3000            .with_batch_size(opts.record_batch_size)
3001            .build()
3002            .unwrap();
3003
3004        let mut total_read = 0;
3005        loop {
3006            let maybe_batch = record_reader.next();
3007            if total_read < expected_data.len() {
3008                let end = min(total_read + opts.record_batch_size, expected_data.len());
3009                let batch = maybe_batch.unwrap().unwrap();
3010                assert_eq!(end - total_read, batch.num_rows());
3011
3012                let a = converter(&expected_data[total_read..end]);
3013                let b = Arc::clone(batch.column(0));
3014
3015                assert_eq!(a.data_type(), b.data_type());
3016                assert_eq!(a.to_data(), b.to_data());
3017                assert_eq!(
3018                    a.as_any().type_id(),
3019                    b.as_any().type_id(),
3020                    "incorrect type ids"
3021                );
3022
3023                total_read = end;
3024            } else {
3025                assert!(maybe_batch.is_none());
3026                break;
3027            }
3028        }
3029    }
3030
3031    fn gen_expected_data<T: DataType>(
3032        def_levels: Option<&Vec<Vec<i16>>>,
3033        values: &[Vec<T::T>],
3034    ) -> Vec<Option<T::T>> {
3035        let data: Vec<Option<T::T>> = match def_levels {
3036            Some(levels) => {
3037                let mut values_iter = values.iter().flatten();
3038                levels
3039                    .iter()
3040                    .flatten()
3041                    .map(|d| match d {
3042                        1 => Some(values_iter.next().cloned().unwrap()),
3043                        0 => None,
3044                        _ => unreachable!(),
3045                    })
3046                    .collect()
3047            }
3048            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3049        };
3050        data
3051    }
3052
3053    fn generate_single_column_file_with_data<T: DataType>(
3054        values: &[Vec<T::T>],
3055        def_levels: Option<&Vec<Vec<i16>>>,
3056        file: File,
3057        schema: TypePtr,
3058        field: Option<Field>,
3059        opts: &TestOptions,
3060    ) -> Result<ParquetMetaData> {
3061        let mut writer_props = opts.writer_props();
3062        if let Some(field) = field {
3063            let arrow_schema = Schema::new(vec![field]);
3064            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3065        }
3066
3067        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3068
3069        for (idx, v) in values.iter().enumerate() {
3070            let def_levels = def_levels.map(|d| d[idx].as_slice());
3071            let mut row_group_writer = writer.next_row_group()?;
3072            {
3073                let mut column_writer = row_group_writer
3074                    .next_column()?
3075                    .expect("Column writer is none!");
3076
3077                column_writer
3078                    .typed::<T>()
3079                    .write_batch(v, def_levels, None)?;
3080
3081                column_writer.close()?;
3082            }
3083            row_group_writer.close()?;
3084        }
3085
3086        writer.close()
3087    }
3088
3089    fn get_test_file(file_name: &str) -> File {
3090        let mut path = PathBuf::new();
3091        path.push(arrow::util::test_util::arrow_test_data());
3092        path.push(file_name);
3093
3094        File::open(path.as_path()).expect("File not found!")
3095    }
3096
3097    #[test]
3098    fn test_read_structs() {
3099        // This particular test file has columns of struct types where there is
3100        // a column that has the same name as one of the struct fields
3101        // (see: ARROW-11452)
3102        let testdata = arrow::util::test_util::parquet_test_data();
3103        let path = format!("{testdata}/nested_structs.rust.parquet");
3104        let file = File::open(&path).unwrap();
3105        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3106
3107        for batch in record_batch_reader {
3108            batch.unwrap();
3109        }
3110
3111        let file = File::open(&path).unwrap();
3112        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3113
3114        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3115        let projected_reader = builder
3116            .with_projection(mask)
3117            .with_batch_size(60)
3118            .build()
3119            .unwrap();
3120
3121        let expected_schema = Schema::new(vec![
3122            Field::new(
3123                "roll_num",
3124                ArrowDataType::Struct(Fields::from(vec![Field::new(
3125                    "count",
3126                    ArrowDataType::UInt64,
3127                    false,
3128                )])),
3129                false,
3130            ),
3131            Field::new(
3132                "PC_CUR",
3133                ArrowDataType::Struct(Fields::from(vec![
3134                    Field::new("mean", ArrowDataType::Int64, false),
3135                    Field::new("sum", ArrowDataType::Int64, false),
3136                ])),
3137                false,
3138            ),
3139        ]);
3140
3141        // Tests for #1652 and #1654
3142        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3143
3144        for batch in projected_reader {
3145            let batch = batch.unwrap();
3146            assert_eq!(batch.schema().as_ref(), &expected_schema);
3147        }
3148    }
3149
3150    #[test]
3151    // same as test_read_structs but constructs projection mask via column names
3152    fn test_read_structs_by_name() {
3153        let testdata = arrow::util::test_util::parquet_test_data();
3154        let path = format!("{testdata}/nested_structs.rust.parquet");
3155        let file = File::open(&path).unwrap();
3156        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3157
3158        for batch in record_batch_reader {
3159            batch.unwrap();
3160        }
3161
3162        let file = File::open(&path).unwrap();
3163        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3164
3165        let mask = ProjectionMask::columns(
3166            builder.parquet_schema(),
3167            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3168        );
3169        let projected_reader = builder
3170            .with_projection(mask)
3171            .with_batch_size(60)
3172            .build()
3173            .unwrap();
3174
3175        let expected_schema = Schema::new(vec![
3176            Field::new(
3177                "roll_num",
3178                ArrowDataType::Struct(Fields::from(vec![Field::new(
3179                    "count",
3180                    ArrowDataType::UInt64,
3181                    false,
3182                )])),
3183                false,
3184            ),
3185            Field::new(
3186                "PC_CUR",
3187                ArrowDataType::Struct(Fields::from(vec![
3188                    Field::new("mean", ArrowDataType::Int64, false),
3189                    Field::new("sum", ArrowDataType::Int64, false),
3190                ])),
3191                false,
3192            ),
3193        ]);
3194
3195        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3196
3197        for batch in projected_reader {
3198            let batch = batch.unwrap();
3199            assert_eq!(batch.schema().as_ref(), &expected_schema);
3200        }
3201    }
3202
3203    #[test]
3204    fn test_read_maps() {
3205        let testdata = arrow::util::test_util::parquet_test_data();
3206        let path = format!("{testdata}/nested_maps.snappy.parquet");
3207        let file = File::open(path).unwrap();
3208        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3209
3210        for batch in record_batch_reader {
3211            batch.unwrap();
3212        }
3213    }
3214
3215    #[test]
3216    fn test_nested_nullability() {
3217        let message_type = "message nested {
3218          OPTIONAL Group group {
3219            REQUIRED INT32 leaf;
3220          }
3221        }";
3222
3223        let file = tempfile::tempfile().unwrap();
3224        let schema = Arc::new(parse_message_type(message_type).unwrap());
3225
3226        {
3227            // Write using low-level parquet API (#1167)
3228            let mut writer =
3229                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3230                    .unwrap();
3231
3232            {
3233                let mut row_group_writer = writer.next_row_group().unwrap();
3234                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3235
3236                column_writer
3237                    .typed::<Int32Type>()
3238                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3239                    .unwrap();
3240
3241                column_writer.close().unwrap();
3242                row_group_writer.close().unwrap();
3243            }
3244
3245            writer.close().unwrap();
3246        }
3247
3248        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3249        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3250
3251        let reader = builder.with_projection(mask).build().unwrap();
3252
3253        let expected_schema = Schema::new(vec![Field::new(
3254            "group",
3255            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3256            true,
3257        )]);
3258
3259        let batch = reader.into_iter().next().unwrap().unwrap();
3260        assert_eq!(batch.schema().as_ref(), &expected_schema);
3261        assert_eq!(batch.num_rows(), 4);
3262        assert_eq!(batch.column(0).null_count(), 2);
3263    }
3264
3265    #[test]
3266    fn test_invalid_utf8() {
3267        // a parquet file with 1 column with invalid utf8
3268        let data = vec![
3269            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3270            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3271            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3272            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3273            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3274            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3275            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3276            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3277            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3278            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3279            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3280            82, 49,
3281        ];
3282
3283        let file = Bytes::from(data);
3284        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3285
3286        let error = record_batch_reader.next().unwrap().unwrap_err();
3287
3288        assert!(
3289            error.to_string().contains("invalid utf-8 sequence"),
3290            "{}",
3291            error
3292        );
3293    }
3294
3295    #[test]
3296    fn test_invalid_utf8_string_array() {
3297        test_invalid_utf8_string_array_inner::<i32>();
3298    }
3299
3300    #[test]
3301    fn test_invalid_utf8_large_string_array() {
3302        test_invalid_utf8_string_array_inner::<i64>();
3303    }
3304
3305    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3306        let cases = [
3307            invalid_utf8_first_char::<O>(),
3308            invalid_utf8_first_char_long_strings::<O>(),
3309            invalid_utf8_later_char::<O>(),
3310            invalid_utf8_later_char_long_strings::<O>(),
3311            invalid_utf8_later_char_really_long_strings::<O>(),
3312            invalid_utf8_later_char_really_long_strings2::<O>(),
3313        ];
3314        for array in &cases {
3315            for encoding in STRING_ENCODINGS {
3316                // data is not valid utf8 we can not construct a correct StringArray
3317                // safely, so purposely create an invalid StringArray
3318                let array = unsafe {
3319                    GenericStringArray::<O>::new_unchecked(
3320                        array.offsets().clone(),
3321                        array.values().clone(),
3322                        array.nulls().cloned(),
3323                    )
3324                };
3325                let data_type = array.data_type().clone();
3326                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3327                let err = read_from_parquet(data).unwrap_err();
3328                let expected_err =
3329                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3330                assert!(
3331                    err.to_string().contains(expected_err),
3332                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3333                );
3334            }
3335        }
3336    }
3337
3338    #[test]
3339    fn test_invalid_utf8_string_view_array() {
3340        let cases = [
3341            invalid_utf8_first_char::<i32>(),
3342            invalid_utf8_first_char_long_strings::<i32>(),
3343            invalid_utf8_later_char::<i32>(),
3344            invalid_utf8_later_char_long_strings::<i32>(),
3345            invalid_utf8_later_char_really_long_strings::<i32>(),
3346            invalid_utf8_later_char_really_long_strings2::<i32>(),
3347        ];
3348
3349        for encoding in STRING_ENCODINGS {
3350            for array in &cases {
3351                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3352                let array = array.as_binary_view();
3353
3354                // data is not valid utf8 we can not construct a correct StringArray
3355                // safely, so purposely create an invalid StringViewArray
3356                let array = unsafe {
3357                    StringViewArray::new_unchecked(
3358                        array.views().clone(),
3359                        array.data_buffers().to_vec(),
3360                        array.nulls().cloned(),
3361                    )
3362                };
3363
3364                let data_type = array.data_type().clone();
3365                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3366                let err = read_from_parquet(data).unwrap_err();
3367                let expected_err =
3368                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3369                assert!(
3370                    err.to_string().contains(expected_err),
3371                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3372                );
3373            }
3374        }
3375    }
3376
3377    /// Encodings suitable for string data
3378    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3379        None,
3380        Some(Encoding::PLAIN),
3381        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3382        Some(Encoding::DELTA_BYTE_ARRAY),
3383    ];
3384
3385    /// Invalid Utf-8 sequence in the first character
3386    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3387    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3388
3389    /// Invalid Utf=8 sequence in NOT the first character
3390    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3391    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3392
3393    /// returns a BinaryArray with invalid UTF8 data in the first character
3394    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3395        let valid: &[u8] = b"   ";
3396        let invalid = INVALID_UTF8_FIRST_CHAR;
3397        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3398    }
3399
3400    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3401    /// string larger than 12 bytes which is handled specially when reading
3402    /// `ByteViewArray`s
3403    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3404        let valid: &[u8] = b"   ";
3405        let mut invalid = vec![];
3406        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3407        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3408        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3409    }
3410
3411    /// returns a BinaryArray with invalid UTF8 data in a character other than
3412    /// the first (this is checked in a special codepath)
3413    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3414        let valid: &[u8] = b"   ";
3415        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3416        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3417    }
3418
3419    /// returns a BinaryArray with invalid UTF8 data in a character other than
3420    /// the first in a string larger than 12 bytes which is handled specially
3421    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3422    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3423        let valid: &[u8] = b"   ";
3424        let mut invalid = vec![];
3425        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3426        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3427        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3428    }
3429
3430    /// returns a BinaryArray with invalid UTF8 data in a character other than
3431    /// the first in a string larger than 128 bytes which is handled specially
3432    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3433    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3434        let valid: &[u8] = b"   ";
3435        let mut invalid = vec![];
3436        for _ in 0..10 {
3437            // each instance is 38 bytes
3438            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3439        }
3440        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3441        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3442    }
3443
3444    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3445    /// invalid UTF8 data in a character other than the first in a string larger
3446    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3447        let valid: &[u8] = b"   ";
3448        let mut valid_long = vec![];
3449        for _ in 0..10 {
3450            // each instance is 38 bytes
3451            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3452        }
3453        let invalid = INVALID_UTF8_LATER_CHAR;
3454        GenericBinaryArray::<O>::from_iter(vec![
3455            None,
3456            Some(valid),
3457            Some(invalid),
3458            None,
3459            Some(&valid_long),
3460            Some(valid),
3461        ])
3462    }
3463
3464    /// writes the array into a single column parquet file with the specified
3465    /// encoding.
3466    ///
3467    /// If no encoding is specified, use default (dictionary) encoding
3468    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3469        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3470        let mut data = vec![];
3471        let schema = batch.schema();
3472        let props = encoding.map(|encoding| {
3473            WriterProperties::builder()
3474                // must disable dictionary encoding to actually use encoding
3475                .set_dictionary_enabled(false)
3476                .set_encoding(encoding)
3477                .build()
3478        });
3479
3480        {
3481            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3482            writer.write(&batch).unwrap();
3483            writer.flush().unwrap();
3484            writer.close().unwrap();
3485        };
3486        data
3487    }
3488
3489    /// read the parquet file into a record batch
3490    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3491        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3492            .unwrap()
3493            .build()
3494            .unwrap();
3495
3496        reader.collect()
3497    }
3498
3499    #[test]
3500    fn test_dictionary_preservation() {
3501        let fields = vec![Arc::new(
3502            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3503                .with_repetition(Repetition::OPTIONAL)
3504                .with_converted_type(ConvertedType::UTF8)
3505                .build()
3506                .unwrap(),
3507        )];
3508
3509        let schema = Arc::new(
3510            Type::group_type_builder("test_schema")
3511                .with_fields(fields)
3512                .build()
3513                .unwrap(),
3514        );
3515
3516        let dict_type = ArrowDataType::Dictionary(
3517            Box::new(ArrowDataType::Int32),
3518            Box::new(ArrowDataType::Utf8),
3519        );
3520
3521        let arrow_field = Field::new("leaf", dict_type, true);
3522
3523        let mut file = tempfile::tempfile().unwrap();
3524
3525        let values = vec![
3526            vec![
3527                ByteArray::from("hello"),
3528                ByteArray::from("a"),
3529                ByteArray::from("b"),
3530                ByteArray::from("d"),
3531            ],
3532            vec![
3533                ByteArray::from("c"),
3534                ByteArray::from("a"),
3535                ByteArray::from("b"),
3536            ],
3537        ];
3538
3539        let def_levels = vec![
3540            vec![1, 0, 0, 1, 0, 0, 1, 1],
3541            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3542        ];
3543
3544        let opts = TestOptions {
3545            encoding: Encoding::RLE_DICTIONARY,
3546            ..Default::default()
3547        };
3548
3549        generate_single_column_file_with_data::<ByteArrayType>(
3550            &values,
3551            Some(&def_levels),
3552            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3553            schema,
3554            Some(arrow_field),
3555            &opts,
3556        )
3557        .unwrap();
3558
3559        file.rewind().unwrap();
3560
3561        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3562
3563        let batches = record_reader
3564            .collect::<Result<Vec<RecordBatch>, _>>()
3565            .unwrap();
3566
3567        assert_eq!(batches.len(), 6);
3568        assert!(batches.iter().all(|x| x.num_columns() == 1));
3569
3570        let row_counts = batches
3571            .iter()
3572            .map(|x| (x.num_rows(), x.column(0).null_count()))
3573            .collect::<Vec<_>>();
3574
3575        assert_eq!(
3576            row_counts,
3577            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3578        );
3579
3580        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3581
3582        // First and second batch in same row group -> same dictionary
3583        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3584        // Third batch spans row group -> computed dictionary
3585        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3586        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3587        // Fourth, fifth and sixth from same row group -> same dictionary
3588        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3589        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3590    }
3591
3592    #[test]
3593    fn test_read_null_list() {
3594        let testdata = arrow::util::test_util::parquet_test_data();
3595        let path = format!("{testdata}/null_list.parquet");
3596        let file = File::open(path).unwrap();
3597        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3598
3599        let batch = record_batch_reader.next().unwrap().unwrap();
3600        assert_eq!(batch.num_rows(), 1);
3601        assert_eq!(batch.num_columns(), 1);
3602        assert_eq!(batch.column(0).len(), 1);
3603
3604        let list = batch
3605            .column(0)
3606            .as_any()
3607            .downcast_ref::<ListArray>()
3608            .unwrap();
3609        assert_eq!(list.len(), 1);
3610        assert!(list.is_valid(0));
3611
3612        let val = list.value(0);
3613        assert_eq!(val.len(), 0);
3614    }
3615
3616    #[test]
3617    fn test_null_schema_inference() {
3618        let testdata = arrow::util::test_util::parquet_test_data();
3619        let path = format!("{testdata}/null_list.parquet");
3620        let file = File::open(path).unwrap();
3621
3622        let arrow_field = Field::new(
3623            "emptylist",
3624            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3625            true,
3626        );
3627
3628        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3629        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3630        let schema = builder.schema();
3631        assert_eq!(schema.fields().len(), 1);
3632        assert_eq!(schema.field(0), &arrow_field);
3633    }
3634
3635    #[test]
3636    fn test_skip_metadata() {
3637        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3638        let field = Field::new("col", col.data_type().clone(), true);
3639
3640        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3641
3642        let metadata = [("key".to_string(), "value".to_string())]
3643            .into_iter()
3644            .collect();
3645
3646        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3647
3648        assert_ne!(schema_with_metadata, schema_without_metadata);
3649
3650        let batch =
3651            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3652
3653        let file = |version: WriterVersion| {
3654            let props = WriterProperties::builder()
3655                .set_writer_version(version)
3656                .build();
3657
3658            let file = tempfile().unwrap();
3659            let mut writer =
3660                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3661                    .unwrap();
3662            writer.write(&batch).unwrap();
3663            writer.close().unwrap();
3664            file
3665        };
3666
3667        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3668
3669        let v1_reader = file(WriterVersion::PARQUET_1_0);
3670        let v2_reader = file(WriterVersion::PARQUET_2_0);
3671
3672        let arrow_reader =
3673            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3674        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3675
3676        let reader =
3677            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3678                .unwrap()
3679                .build()
3680                .unwrap();
3681        assert_eq!(reader.schema(), schema_without_metadata);
3682
3683        let arrow_reader =
3684            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3685        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3686
3687        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3688            .unwrap()
3689            .build()
3690            .unwrap();
3691        assert_eq!(reader.schema(), schema_without_metadata);
3692    }
3693
3694    fn write_parquet_from_iter<I, F>(value: I) -> File
3695    where
3696        I: IntoIterator<Item = (F, ArrayRef)>,
3697        F: AsRef<str>,
3698    {
3699        let batch = RecordBatch::try_from_iter(value).unwrap();
3700        let file = tempfile().unwrap();
3701        let mut writer =
3702            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3703        writer.write(&batch).unwrap();
3704        writer.close().unwrap();
3705        file
3706    }
3707
3708    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3709    where
3710        I: IntoIterator<Item = (F, ArrayRef)>,
3711        F: AsRef<str>,
3712    {
3713        let file = write_parquet_from_iter(value);
3714        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3715        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3716            file.try_clone().unwrap(),
3717            options_with_schema,
3718        );
3719        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3720    }
3721
3722    #[test]
3723    fn test_schema_too_few_columns() {
3724        run_schema_test_with_error(
3725            vec![
3726                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3727                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3728            ],
3729            Arc::new(Schema::new(vec![Field::new(
3730                "int64",
3731                ArrowDataType::Int64,
3732                false,
3733            )])),
3734            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3735        );
3736    }
3737
3738    #[test]
3739    fn test_schema_too_many_columns() {
3740        run_schema_test_with_error(
3741            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3742            Arc::new(Schema::new(vec![
3743                Field::new("int64", ArrowDataType::Int64, false),
3744                Field::new("int32", ArrowDataType::Int32, false),
3745            ])),
3746            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3747        );
3748    }
3749
3750    #[test]
3751    fn test_schema_mismatched_column_names() {
3752        run_schema_test_with_error(
3753            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3754            Arc::new(Schema::new(vec![Field::new(
3755                "other",
3756                ArrowDataType::Int64,
3757                false,
3758            )])),
3759            "Arrow: incompatible arrow schema, expected field named int64 got other",
3760        );
3761    }
3762
3763    #[test]
3764    fn test_schema_incompatible_columns() {
3765        run_schema_test_with_error(
3766            vec![
3767                (
3768                    "col1_invalid",
3769                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3770                ),
3771                (
3772                    "col2_valid",
3773                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3774                ),
3775                (
3776                    "col3_invalid",
3777                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3778                ),
3779            ],
3780            Arc::new(Schema::new(vec![
3781                Field::new("col1_invalid", ArrowDataType::Int32, false),
3782                Field::new("col2_valid", ArrowDataType::Int32, false),
3783                Field::new("col3_invalid", ArrowDataType::Int32, false),
3784            ])),
3785            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3786        );
3787    }
3788
3789    #[test]
3790    fn test_one_incompatible_nested_column() {
3791        let nested_fields = Fields::from(vec![
3792            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3793            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3794        ]);
3795        let nested = StructArray::try_new(
3796            nested_fields,
3797            vec![
3798                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3799                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3800            ],
3801            None,
3802        )
3803        .expect("struct array");
3804        let supplied_nested_fields = Fields::from(vec![
3805            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3806            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3807        ]);
3808        run_schema_test_with_error(
3809            vec![
3810                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3811                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3812                ("nested", Arc::new(nested) as ArrayRef),
3813            ],
3814            Arc::new(Schema::new(vec![
3815                Field::new("col1", ArrowDataType::Int64, false),
3816                Field::new("col2", ArrowDataType::Int32, false),
3817                Field::new(
3818                    "nested",
3819                    ArrowDataType::Struct(supplied_nested_fields),
3820                    false,
3821                ),
3822            ])),
3823            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3824            requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3825            but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3826        );
3827    }
3828
3829    /// Return parquet data with a single column of utf8 strings
3830    fn utf8_parquet() -> Bytes {
3831        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3832        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3833        let props = None;
3834        // write parquet file with non nullable strings
3835        let mut parquet_data = vec![];
3836        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3837        writer.write(&batch).unwrap();
3838        writer.close().unwrap();
3839        Bytes::from(parquet_data)
3840    }
3841
3842    #[test]
3843    fn test_schema_error_bad_types() {
3844        // verify incompatible schemas error on read
3845        let parquet_data = utf8_parquet();
3846
3847        // Ask to read it back with an incompatible schema (int vs string)
3848        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3849            "column1",
3850            arrow::datatypes::DataType::Int32,
3851            false,
3852        )]));
3853
3854        // read it back out
3855        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3856        let err =
3857            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3858                .unwrap_err();
3859        assert_eq!(
3860            err.to_string(),
3861            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3862        )
3863    }
3864
3865    #[test]
3866    fn test_schema_error_bad_nullability() {
3867        // verify incompatible schemas error on read
3868        let parquet_data = utf8_parquet();
3869
3870        // Ask to read it back with an incompatible schema (nullability mismatch)
3871        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3872            "column1",
3873            arrow::datatypes::DataType::Utf8,
3874            true,
3875        )]));
3876
3877        // read it back out
3878        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3879        let err =
3880            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3881                .unwrap_err();
3882        assert_eq!(
3883            err.to_string(),
3884            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3885        )
3886    }
3887
3888    #[test]
3889    fn test_read_binary_as_utf8() {
3890        let file = write_parquet_from_iter(vec![
3891            (
3892                "binary_to_utf8",
3893                Arc::new(BinaryArray::from(vec![
3894                    b"one".as_ref(),
3895                    b"two".as_ref(),
3896                    b"three".as_ref(),
3897                ])) as ArrayRef,
3898            ),
3899            (
3900                "large_binary_to_large_utf8",
3901                Arc::new(LargeBinaryArray::from(vec![
3902                    b"one".as_ref(),
3903                    b"two".as_ref(),
3904                    b"three".as_ref(),
3905                ])) as ArrayRef,
3906            ),
3907            (
3908                "binary_view_to_utf8_view",
3909                Arc::new(BinaryViewArray::from(vec![
3910                    b"one".as_ref(),
3911                    b"two".as_ref(),
3912                    b"three".as_ref(),
3913                ])) as ArrayRef,
3914            ),
3915        ]);
3916        let supplied_fields = Fields::from(vec![
3917            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3918            Field::new(
3919                "large_binary_to_large_utf8",
3920                ArrowDataType::LargeUtf8,
3921                false,
3922            ),
3923            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3924        ]);
3925
3926        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3927        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3928            file.try_clone().unwrap(),
3929            options,
3930        )
3931        .expect("reader builder with schema")
3932        .build()
3933        .expect("reader with schema");
3934
3935        let batch = arrow_reader.next().unwrap().unwrap();
3936        assert_eq!(batch.num_columns(), 3);
3937        assert_eq!(batch.num_rows(), 3);
3938        assert_eq!(
3939            batch
3940                .column(0)
3941                .as_string::<i32>()
3942                .iter()
3943                .collect::<Vec<_>>(),
3944            vec![Some("one"), Some("two"), Some("three")]
3945        );
3946
3947        assert_eq!(
3948            batch
3949                .column(1)
3950                .as_string::<i64>()
3951                .iter()
3952                .collect::<Vec<_>>(),
3953            vec![Some("one"), Some("two"), Some("three")]
3954        );
3955
3956        assert_eq!(
3957            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3958            vec![Some("one"), Some("two"), Some("three")]
3959        );
3960    }
3961
3962    #[test]
3963    #[should_panic(expected = "Invalid UTF8 sequence at")]
3964    fn test_read_non_utf8_binary_as_utf8() {
3965        let file = write_parquet_from_iter(vec![(
3966            "non_utf8_binary",
3967            Arc::new(BinaryArray::from(vec![
3968                b"\xDE\x00\xFF".as_ref(),
3969                b"\xDE\x01\xAA".as_ref(),
3970                b"\xDE\x02\xFF".as_ref(),
3971            ])) as ArrayRef,
3972        )]);
3973        let supplied_fields = Fields::from(vec![Field::new(
3974            "non_utf8_binary",
3975            ArrowDataType::Utf8,
3976            false,
3977        )]);
3978
3979        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3980        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3981            file.try_clone().unwrap(),
3982            options,
3983        )
3984        .expect("reader builder with schema")
3985        .build()
3986        .expect("reader with schema");
3987        arrow_reader.next().unwrap().unwrap_err();
3988    }
3989
3990    #[test]
3991    fn test_with_schema() {
3992        let nested_fields = Fields::from(vec![
3993            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3994            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3995        ]);
3996
3997        let nested_arrays: Vec<ArrayRef> = vec![
3998            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3999            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4000        ];
4001
4002        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4003
4004        let file = write_parquet_from_iter(vec![
4005            (
4006                "int32_to_ts_second",
4007                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4008            ),
4009            (
4010                "date32_to_date64",
4011                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4012            ),
4013            ("nested", Arc::new(nested) as ArrayRef),
4014        ]);
4015
4016        let supplied_nested_fields = Fields::from(vec![
4017            Field::new(
4018                "utf8_to_dict",
4019                ArrowDataType::Dictionary(
4020                    Box::new(ArrowDataType::Int32),
4021                    Box::new(ArrowDataType::Utf8),
4022                ),
4023                false,
4024            ),
4025            Field::new(
4026                "int64_to_ts_nano",
4027                ArrowDataType::Timestamp(
4028                    arrow::datatypes::TimeUnit::Nanosecond,
4029                    Some("+10:00".into()),
4030                ),
4031                false,
4032            ),
4033        ]);
4034
4035        let supplied_schema = Arc::new(Schema::new(vec![
4036            Field::new(
4037                "int32_to_ts_second",
4038                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4039                false,
4040            ),
4041            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4042            Field::new(
4043                "nested",
4044                ArrowDataType::Struct(supplied_nested_fields),
4045                false,
4046            ),
4047        ]));
4048
4049        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4050        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4051            file.try_clone().unwrap(),
4052            options,
4053        )
4054        .expect("reader builder with schema")
4055        .build()
4056        .expect("reader with schema");
4057
4058        assert_eq!(arrow_reader.schema(), supplied_schema);
4059        let batch = arrow_reader.next().unwrap().unwrap();
4060        assert_eq!(batch.num_columns(), 3);
4061        assert_eq!(batch.num_rows(), 4);
4062        assert_eq!(
4063            batch
4064                .column(0)
4065                .as_any()
4066                .downcast_ref::<TimestampSecondArray>()
4067                .expect("downcast to timestamp second")
4068                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4069                .map(|v| v.to_string())
4070                .expect("value as datetime"),
4071            "1970-01-01 01:00:00 +01:00"
4072        );
4073        assert_eq!(
4074            batch
4075                .column(1)
4076                .as_any()
4077                .downcast_ref::<Date64Array>()
4078                .expect("downcast to date64")
4079                .value_as_date(0)
4080                .map(|v| v.to_string())
4081                .expect("value as date"),
4082            "1970-01-01"
4083        );
4084
4085        let nested = batch
4086            .column(2)
4087            .as_any()
4088            .downcast_ref::<StructArray>()
4089            .expect("downcast to struct");
4090
4091        let nested_dict = nested
4092            .column(0)
4093            .as_any()
4094            .downcast_ref::<Int32DictionaryArray>()
4095            .expect("downcast to dictionary");
4096
4097        assert_eq!(
4098            nested_dict
4099                .values()
4100                .as_any()
4101                .downcast_ref::<StringArray>()
4102                .expect("downcast to string")
4103                .iter()
4104                .collect::<Vec<_>>(),
4105            vec![Some("a"), Some("b")]
4106        );
4107
4108        assert_eq!(
4109            nested_dict.keys().iter().collect::<Vec<_>>(),
4110            vec![Some(0), Some(0), Some(0), Some(1)]
4111        );
4112
4113        assert_eq!(
4114            nested
4115                .column(1)
4116                .as_any()
4117                .downcast_ref::<TimestampNanosecondArray>()
4118                .expect("downcast to timestamp nanosecond")
4119                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4120                .map(|v| v.to_string())
4121                .expect("value as datetime"),
4122            "1970-01-01 10:00:00.000000001 +10:00"
4123        );
4124    }
4125
4126    #[test]
4127    fn test_empty_projection() {
4128        let testdata = arrow::util::test_util::parquet_test_data();
4129        let path = format!("{testdata}/alltypes_plain.parquet");
4130        let file = File::open(path).unwrap();
4131
4132        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4133        let file_metadata = builder.metadata().file_metadata();
4134        let expected_rows = file_metadata.num_rows() as usize;
4135
4136        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4137        let batch_reader = builder
4138            .with_projection(mask)
4139            .with_batch_size(2)
4140            .build()
4141            .unwrap();
4142
4143        let mut total_rows = 0;
4144        for maybe_batch in batch_reader {
4145            let batch = maybe_batch.unwrap();
4146            total_rows += batch.num_rows();
4147            assert_eq!(batch.num_columns(), 0);
4148            assert!(batch.num_rows() <= 2);
4149        }
4150
4151        assert_eq!(total_rows, expected_rows);
4152    }
4153
4154    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4155        let schema = Arc::new(Schema::new(vec![Field::new(
4156            "list",
4157            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4158            true,
4159        )]));
4160
4161        let mut buf = Vec::with_capacity(1024);
4162
4163        let mut writer = ArrowWriter::try_new(
4164            &mut buf,
4165            schema.clone(),
4166            Some(
4167                WriterProperties::builder()
4168                    .set_max_row_group_size(row_group_size)
4169                    .build(),
4170            ),
4171        )
4172        .unwrap();
4173        for _ in 0..2 {
4174            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4175            for _ in 0..(batch_size) {
4176                list_builder.append(true);
4177            }
4178            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4179                .unwrap();
4180            writer.write(&batch).unwrap();
4181        }
4182        writer.close().unwrap();
4183
4184        let mut record_reader =
4185            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4186        assert_eq!(
4187            batch_size,
4188            record_reader.next().unwrap().unwrap().num_rows()
4189        );
4190        assert_eq!(
4191            batch_size,
4192            record_reader.next().unwrap().unwrap().num_rows()
4193        );
4194    }
4195
4196    #[test]
4197    fn test_row_group_exact_multiple() {
4198        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4199        test_row_group_batch(8, 8);
4200        test_row_group_batch(10, 8);
4201        test_row_group_batch(8, 10);
4202        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4203        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4204        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4205        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4206        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4207    }
4208
4209    /// Given a RecordBatch containing all the column data, return the expected batches given
4210    /// a `batch_size` and `selection`
4211    fn get_expected_batches(
4212        column: &RecordBatch,
4213        selection: &RowSelection,
4214        batch_size: usize,
4215    ) -> Vec<RecordBatch> {
4216        let mut expected_batches = vec![];
4217
4218        let mut selection: VecDeque<_> = selection.clone().into();
4219        let mut row_offset = 0;
4220        let mut last_start = None;
4221        while row_offset < column.num_rows() && !selection.is_empty() {
4222            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4223            while batch_remaining > 0 && !selection.is_empty() {
4224                let (to_read, skip) = match selection.front_mut() {
4225                    Some(selection) if selection.row_count > batch_remaining => {
4226                        selection.row_count -= batch_remaining;
4227                        (batch_remaining, selection.skip)
4228                    }
4229                    Some(_) => {
4230                        let select = selection.pop_front().unwrap();
4231                        (select.row_count, select.skip)
4232                    }
4233                    None => break,
4234                };
4235
4236                batch_remaining -= to_read;
4237
4238                match skip {
4239                    true => {
4240                        if let Some(last_start) = last_start.take() {
4241                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4242                        }
4243                        row_offset += to_read
4244                    }
4245                    false => {
4246                        last_start.get_or_insert(row_offset);
4247                        row_offset += to_read
4248                    }
4249                }
4250            }
4251        }
4252
4253        if let Some(last_start) = last_start.take() {
4254            expected_batches.push(column.slice(last_start, row_offset - last_start))
4255        }
4256
4257        // Sanity check, all batches except the final should be the batch size
4258        for batch in &expected_batches[..expected_batches.len() - 1] {
4259            assert_eq!(batch.num_rows(), batch_size);
4260        }
4261
4262        expected_batches
4263    }
4264
4265    fn create_test_selection(
4266        step_len: usize,
4267        total_len: usize,
4268        skip_first: bool,
4269    ) -> (RowSelection, usize) {
4270        let mut remaining = total_len;
4271        let mut skip = skip_first;
4272        let mut vec = vec![];
4273        let mut selected_count = 0;
4274        while remaining != 0 {
4275            let step = if remaining > step_len {
4276                step_len
4277            } else {
4278                remaining
4279            };
4280            vec.push(RowSelector {
4281                row_count: step,
4282                skip,
4283            });
4284            remaining -= step;
4285            if !skip {
4286                selected_count += step;
4287            }
4288            skip = !skip;
4289        }
4290        (vec.into(), selected_count)
4291    }
4292
4293    #[test]
4294    fn test_scan_row_with_selection() {
4295        let testdata = arrow::util::test_util::parquet_test_data();
4296        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4297        let test_file = File::open(&path).unwrap();
4298
4299        let mut serial_reader =
4300            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4301        let data = serial_reader.next().unwrap().unwrap();
4302
4303        let do_test = |batch_size: usize, selection_len: usize| {
4304            for skip_first in [false, true] {
4305                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4306
4307                let expected = get_expected_batches(&data, &selections, batch_size);
4308                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4309                assert_eq!(
4310                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4311                    expected,
4312                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4313                );
4314            }
4315        };
4316
4317        // total row count 7300
4318        // 1. test selection len more than one page row count
4319        do_test(1000, 1000);
4320
4321        // 2. test selection len less than one page row count
4322        do_test(20, 20);
4323
4324        // 3. test selection_len less than batch_size
4325        do_test(20, 5);
4326
4327        // 4. test selection_len more than batch_size
4328        // If batch_size < selection_len
4329        do_test(20, 5);
4330
4331        fn create_skip_reader(
4332            test_file: &File,
4333            batch_size: usize,
4334            selections: RowSelection,
4335        ) -> ParquetRecordBatchReader {
4336            let options = ArrowReaderOptions::new().with_page_index(true);
4337            let file = test_file.try_clone().unwrap();
4338            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4339                .unwrap()
4340                .with_batch_size(batch_size)
4341                .with_row_selection(selections)
4342                .build()
4343                .unwrap()
4344        }
4345    }
4346
4347    #[test]
4348    fn test_batch_size_overallocate() {
4349        let testdata = arrow::util::test_util::parquet_test_data();
4350        // `alltypes_plain.parquet` only have 8 rows
4351        let path = format!("{testdata}/alltypes_plain.parquet");
4352        let test_file = File::open(path).unwrap();
4353
4354        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4355        let num_rows = builder.metadata.file_metadata().num_rows();
4356        let reader = builder
4357            .with_batch_size(1024)
4358            .with_projection(ProjectionMask::all())
4359            .build()
4360            .unwrap();
4361        assert_ne!(1024, num_rows);
4362        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4363    }
4364
4365    #[test]
4366    fn test_read_with_page_index_enabled() {
4367        let testdata = arrow::util::test_util::parquet_test_data();
4368
4369        {
4370            // `alltypes_tiny_pages.parquet` has page index
4371            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4372            let test_file = File::open(path).unwrap();
4373            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4374                test_file,
4375                ArrowReaderOptions::new().with_page_index(true),
4376            )
4377            .unwrap();
4378            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4379            let reader = builder.build().unwrap();
4380            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4381            assert_eq!(batches.len(), 8);
4382        }
4383
4384        {
4385            // `alltypes_plain.parquet` doesn't have page index
4386            let path = format!("{testdata}/alltypes_plain.parquet");
4387            let test_file = File::open(path).unwrap();
4388            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4389                test_file,
4390                ArrowReaderOptions::new().with_page_index(true),
4391            )
4392            .unwrap();
4393            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4394            // we should read the file successfully.
4395            assert!(builder.metadata().offset_index().is_none());
4396            let reader = builder.build().unwrap();
4397            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4398            assert_eq!(batches.len(), 1);
4399        }
4400    }
4401
4402    #[test]
4403    fn test_raw_repetition() {
4404        const MESSAGE_TYPE: &str = "
4405            message Log {
4406              OPTIONAL INT32 eventType;
4407              REPEATED INT32 category;
4408              REPEATED group filter {
4409                OPTIONAL INT32 error;
4410              }
4411            }
4412        ";
4413        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4414        let props = Default::default();
4415
4416        let mut buf = Vec::with_capacity(1024);
4417        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4418        let mut row_group_writer = writer.next_row_group().unwrap();
4419
4420        // column 0
4421        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4422        col_writer
4423            .typed::<Int32Type>()
4424            .write_batch(&[1], Some(&[1]), None)
4425            .unwrap();
4426        col_writer.close().unwrap();
4427        // column 1
4428        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4429        col_writer
4430            .typed::<Int32Type>()
4431            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4432            .unwrap();
4433        col_writer.close().unwrap();
4434        // column 2
4435        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4436        col_writer
4437            .typed::<Int32Type>()
4438            .write_batch(&[1], Some(&[1]), Some(&[0]))
4439            .unwrap();
4440        col_writer.close().unwrap();
4441
4442        let rg_md = row_group_writer.close().unwrap();
4443        assert_eq!(rg_md.num_rows(), 1);
4444        writer.close().unwrap();
4445
4446        let bytes = Bytes::from(buf);
4447
4448        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4449        let full = no_mask.next().unwrap().unwrap();
4450
4451        assert_eq!(full.num_columns(), 3);
4452
4453        for idx in 0..3 {
4454            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4455            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4456            let mut reader = b.with_projection(mask).build().unwrap();
4457            let projected = reader.next().unwrap().unwrap();
4458
4459            assert_eq!(projected.num_columns(), 1);
4460            assert_eq!(full.column(idx), projected.column(0));
4461        }
4462    }
4463
4464    #[test]
4465    fn test_read_lz4_raw() {
4466        let testdata = arrow::util::test_util::parquet_test_data();
4467        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4468        let file = File::open(path).unwrap();
4469
4470        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4471            .unwrap()
4472            .collect::<Result<Vec<_>, _>>()
4473            .unwrap();
4474        assert_eq!(batches.len(), 1);
4475        let batch = &batches[0];
4476
4477        assert_eq!(batch.num_columns(), 3);
4478        assert_eq!(batch.num_rows(), 4);
4479
4480        // https://github.com/apache/parquet-testing/pull/18
4481        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4482        assert_eq!(
4483            a.values(),
4484            &[1593604800, 1593604800, 1593604801, 1593604801]
4485        );
4486
4487        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4488        let a: Vec<_> = a.iter().flatten().collect();
4489        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4490
4491        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4492        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4493    }
4494
4495    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4496    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4497    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4498    //    LZ4_HADOOP algorithm for compression.
4499    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4500    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4501    //    older parquet-cpp versions.
4502    //
4503    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4504    #[test]
4505    fn test_read_lz4_hadoop_fallback() {
4506        for file in [
4507            "hadoop_lz4_compressed.parquet",
4508            "non_hadoop_lz4_compressed.parquet",
4509        ] {
4510            let testdata = arrow::util::test_util::parquet_test_data();
4511            let path = format!("{testdata}/{file}");
4512            let file = File::open(path).unwrap();
4513            let expected_rows = 4;
4514
4515            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4516                .unwrap()
4517                .collect::<Result<Vec<_>, _>>()
4518                .unwrap();
4519            assert_eq!(batches.len(), 1);
4520            let batch = &batches[0];
4521
4522            assert_eq!(batch.num_columns(), 3);
4523            assert_eq!(batch.num_rows(), expected_rows);
4524
4525            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4526            assert_eq!(
4527                a.values(),
4528                &[1593604800, 1593604800, 1593604801, 1593604801]
4529            );
4530
4531            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4532            let b: Vec<_> = b.iter().flatten().collect();
4533            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4534
4535            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4536            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4537        }
4538    }
4539
4540    #[test]
4541    fn test_read_lz4_hadoop_large() {
4542        let testdata = arrow::util::test_util::parquet_test_data();
4543        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4544        let file = File::open(path).unwrap();
4545        let expected_rows = 10000;
4546
4547        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4548            .unwrap()
4549            .collect::<Result<Vec<_>, _>>()
4550            .unwrap();
4551        assert_eq!(batches.len(), 1);
4552        let batch = &batches[0];
4553
4554        assert_eq!(batch.num_columns(), 1);
4555        assert_eq!(batch.num_rows(), expected_rows);
4556
4557        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4558        let a: Vec<_> = a.iter().flatten().collect();
4559        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4560        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4561        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4562        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4563    }
4564
4565    #[test]
4566    #[cfg(feature = "snap")]
4567    fn test_read_nested_lists() {
4568        let testdata = arrow::util::test_util::parquet_test_data();
4569        let path = format!("{testdata}/nested_lists.snappy.parquet");
4570        let file = File::open(path).unwrap();
4571
4572        let f = file.try_clone().unwrap();
4573        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4574        let expected = reader.next().unwrap().unwrap();
4575        assert_eq!(expected.num_rows(), 3);
4576
4577        let selection = RowSelection::from(vec![
4578            RowSelector::skip(1),
4579            RowSelector::select(1),
4580            RowSelector::skip(1),
4581        ]);
4582        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4583            .unwrap()
4584            .with_row_selection(selection)
4585            .build()
4586            .unwrap();
4587
4588        let actual = reader.next().unwrap().unwrap();
4589        assert_eq!(actual.num_rows(), 1);
4590        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4591    }
4592
4593    #[test]
4594    fn test_arbitrary_decimal() {
4595        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4596        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4597            .with_precision_and_scale(19, 0)
4598            .unwrap();
4599        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4600            .with_precision_and_scale(12, 0)
4601            .unwrap();
4602        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4603            .with_precision_and_scale(17, 10)
4604            .unwrap();
4605
4606        let written = RecordBatch::try_from_iter([
4607            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4608            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4609            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4610        ])
4611        .unwrap();
4612
4613        let mut buffer = Vec::with_capacity(1024);
4614        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4615        writer.write(&written).unwrap();
4616        writer.close().unwrap();
4617
4618        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4619            .unwrap()
4620            .collect::<Result<Vec<_>, _>>()
4621            .unwrap();
4622
4623        assert_eq!(&written.slice(0, 8), &read[0]);
4624    }
4625
4626    #[test]
4627    fn test_list_skip() {
4628        let mut list = ListBuilder::new(Int32Builder::new());
4629        list.append_value([Some(1), Some(2)]);
4630        list.append_value([Some(3)]);
4631        list.append_value([Some(4)]);
4632        let list = list.finish();
4633        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4634
4635        // First page contains 2 values but only 1 row
4636        let props = WriterProperties::builder()
4637            .set_data_page_row_count_limit(1)
4638            .set_write_batch_size(2)
4639            .build();
4640
4641        let mut buffer = Vec::with_capacity(1024);
4642        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4643        writer.write(&batch).unwrap();
4644        writer.close().unwrap();
4645
4646        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4647        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4648            .unwrap()
4649            .with_row_selection(selection.into())
4650            .build()
4651            .unwrap();
4652        let out = reader.next().unwrap().unwrap();
4653        assert_eq!(out.num_rows(), 1);
4654        assert_eq!(out, batch.slice(2, 1));
4655    }
4656
4657    fn test_decimal32_roundtrip() {
4658        let d = |values: Vec<i32>, p: u8| {
4659            let iter = values.into_iter();
4660            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4661                .with_precision_and_scale(p, 2)
4662                .unwrap()
4663        };
4664
4665        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4666        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4667
4668        let mut buffer = Vec::with_capacity(1024);
4669        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4670        writer.write(&batch).unwrap();
4671        writer.close().unwrap();
4672
4673        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4674        let t1 = builder.parquet_schema().columns()[0].physical_type();
4675        assert_eq!(t1, PhysicalType::INT32);
4676
4677        let mut reader = builder.build().unwrap();
4678        assert_eq!(batch.schema(), reader.schema());
4679
4680        let out = reader.next().unwrap().unwrap();
4681        assert_eq!(batch, out);
4682    }
4683
4684    fn test_decimal64_roundtrip() {
4685        // Precision <= 9 -> INT32
4686        // Precision <= 18 -> INT64
4687
4688        let d = |values: Vec<i64>, p: u8| {
4689            let iter = values.into_iter();
4690            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4691                .with_precision_and_scale(p, 2)
4692                .unwrap()
4693        };
4694
4695        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4696        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4697        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4698
4699        let batch = RecordBatch::try_from_iter([
4700            ("d1", Arc::new(d1) as ArrayRef),
4701            ("d2", Arc::new(d2) as ArrayRef),
4702            ("d3", Arc::new(d3) as ArrayRef),
4703        ])
4704        .unwrap();
4705
4706        let mut buffer = Vec::with_capacity(1024);
4707        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4708        writer.write(&batch).unwrap();
4709        writer.close().unwrap();
4710
4711        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4712        let t1 = builder.parquet_schema().columns()[0].physical_type();
4713        assert_eq!(t1, PhysicalType::INT32);
4714        let t2 = builder.parquet_schema().columns()[1].physical_type();
4715        assert_eq!(t2, PhysicalType::INT64);
4716        let t3 = builder.parquet_schema().columns()[2].physical_type();
4717        assert_eq!(t3, PhysicalType::INT64);
4718
4719        let mut reader = builder.build().unwrap();
4720        assert_eq!(batch.schema(), reader.schema());
4721
4722        let out = reader.next().unwrap().unwrap();
4723        assert_eq!(batch, out);
4724    }
4725
4726    fn test_decimal_roundtrip<T: DecimalType>() {
4727        // Precision <= 9 -> INT32
4728        // Precision <= 18 -> INT64
4729        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4730
4731        let d = |values: Vec<usize>, p: u8| {
4732            let iter = values.into_iter().map(T::Native::usize_as);
4733            PrimitiveArray::<T>::from_iter_values(iter)
4734                .with_precision_and_scale(p, 2)
4735                .unwrap()
4736        };
4737
4738        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4739        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4740        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4741        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4742
4743        let batch = RecordBatch::try_from_iter([
4744            ("d1", Arc::new(d1) as ArrayRef),
4745            ("d2", Arc::new(d2) as ArrayRef),
4746            ("d3", Arc::new(d3) as ArrayRef),
4747            ("d4", Arc::new(d4) as ArrayRef),
4748        ])
4749        .unwrap();
4750
4751        let mut buffer = Vec::with_capacity(1024);
4752        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4753        writer.write(&batch).unwrap();
4754        writer.close().unwrap();
4755
4756        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4757        let t1 = builder.parquet_schema().columns()[0].physical_type();
4758        assert_eq!(t1, PhysicalType::INT32);
4759        let t2 = builder.parquet_schema().columns()[1].physical_type();
4760        assert_eq!(t2, PhysicalType::INT64);
4761        let t3 = builder.parquet_schema().columns()[2].physical_type();
4762        assert_eq!(t3, PhysicalType::INT64);
4763        let t4 = builder.parquet_schema().columns()[3].physical_type();
4764        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4765
4766        let mut reader = builder.build().unwrap();
4767        assert_eq!(batch.schema(), reader.schema());
4768
4769        let out = reader.next().unwrap().unwrap();
4770        assert_eq!(batch, out);
4771    }
4772
4773    #[test]
4774    fn test_decimal() {
4775        test_decimal32_roundtrip();
4776        test_decimal64_roundtrip();
4777        test_decimal_roundtrip::<Decimal128Type>();
4778        test_decimal_roundtrip::<Decimal256Type>();
4779    }
4780
4781    #[test]
4782    fn test_list_selection() {
4783        let schema = Arc::new(Schema::new(vec![Field::new_list(
4784            "list",
4785            Field::new_list_field(ArrowDataType::Utf8, true),
4786            false,
4787        )]));
4788        let mut buf = Vec::with_capacity(1024);
4789
4790        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4791
4792        for i in 0..2 {
4793            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4794            for j in 0..1024 {
4795                list_a_builder.values().append_value(format!("{i} {j}"));
4796                list_a_builder.append(true);
4797            }
4798            let batch =
4799                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4800                    .unwrap();
4801            writer.write(&batch).unwrap();
4802        }
4803        let _metadata = writer.close().unwrap();
4804
4805        let buf = Bytes::from(buf);
4806        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4807            .unwrap()
4808            .with_row_selection(RowSelection::from(vec![
4809                RowSelector::skip(100),
4810                RowSelector::select(924),
4811                RowSelector::skip(100),
4812                RowSelector::select(924),
4813            ]))
4814            .build()
4815            .unwrap();
4816
4817        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4818        let batch = concat_batches(&schema, &batches).unwrap();
4819
4820        assert_eq!(batch.num_rows(), 924 * 2);
4821        let list = batch.column(0).as_list::<i32>();
4822
4823        for w in list.value_offsets().windows(2) {
4824            assert_eq!(w[0] + 1, w[1])
4825        }
4826        let mut values = list.values().as_string::<i32>().iter();
4827
4828        for i in 0..2 {
4829            for j in 100..1024 {
4830                let expected = format!("{i} {j}");
4831                assert_eq!(values.next().unwrap().unwrap(), &expected);
4832            }
4833        }
4834    }
4835
4836    #[test]
4837    fn test_list_selection_fuzz() {
4838        let mut rng = rng();
4839        let schema = Arc::new(Schema::new(vec![Field::new_list(
4840            "list",
4841            Field::new_list(
4842                Field::LIST_FIELD_DEFAULT_NAME,
4843                Field::new_list_field(ArrowDataType::Int32, true),
4844                true,
4845            ),
4846            true,
4847        )]));
4848        let mut buf = Vec::with_capacity(1024);
4849        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4850
4851        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4852
4853        for _ in 0..2048 {
4854            if rng.random_bool(0.2) {
4855                list_a_builder.append(false);
4856                continue;
4857            }
4858
4859            let list_a_len = rng.random_range(0..10);
4860            let list_b_builder = list_a_builder.values();
4861
4862            for _ in 0..list_a_len {
4863                if rng.random_bool(0.2) {
4864                    list_b_builder.append(false);
4865                    continue;
4866                }
4867
4868                let list_b_len = rng.random_range(0..10);
4869                let int_builder = list_b_builder.values();
4870                for _ in 0..list_b_len {
4871                    match rng.random_bool(0.2) {
4872                        true => int_builder.append_null(),
4873                        false => int_builder.append_value(rng.random()),
4874                    }
4875                }
4876                list_b_builder.append(true)
4877            }
4878            list_a_builder.append(true);
4879        }
4880
4881        let array = Arc::new(list_a_builder.finish());
4882        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4883
4884        writer.write(&batch).unwrap();
4885        let _metadata = writer.close().unwrap();
4886
4887        let buf = Bytes::from(buf);
4888
4889        let cases = [
4890            vec![
4891                RowSelector::skip(100),
4892                RowSelector::select(924),
4893                RowSelector::skip(100),
4894                RowSelector::select(924),
4895            ],
4896            vec![
4897                RowSelector::select(924),
4898                RowSelector::skip(100),
4899                RowSelector::select(924),
4900                RowSelector::skip(100),
4901            ],
4902            vec![
4903                RowSelector::skip(1023),
4904                RowSelector::select(1),
4905                RowSelector::skip(1023),
4906                RowSelector::select(1),
4907            ],
4908            vec![
4909                RowSelector::select(1),
4910                RowSelector::skip(1023),
4911                RowSelector::select(1),
4912                RowSelector::skip(1023),
4913            ],
4914        ];
4915
4916        for batch_size in [100, 1024, 2048] {
4917            for selection in &cases {
4918                let selection = RowSelection::from(selection.clone());
4919                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4920                    .unwrap()
4921                    .with_row_selection(selection.clone())
4922                    .with_batch_size(batch_size)
4923                    .build()
4924                    .unwrap();
4925
4926                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4927                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4928                assert_eq!(actual.num_rows(), selection.row_count());
4929
4930                let mut batch_offset = 0;
4931                let mut actual_offset = 0;
4932                for selector in selection.iter() {
4933                    if selector.skip {
4934                        batch_offset += selector.row_count;
4935                        continue;
4936                    }
4937
4938                    assert_eq!(
4939                        batch.slice(batch_offset, selector.row_count),
4940                        actual.slice(actual_offset, selector.row_count)
4941                    );
4942
4943                    batch_offset += selector.row_count;
4944                    actual_offset += selector.row_count;
4945                }
4946            }
4947        }
4948    }
4949
4950    #[test]
4951    fn test_read_old_nested_list() {
4952        use arrow::datatypes::DataType;
4953        use arrow::datatypes::ToByteSlice;
4954
4955        let testdata = arrow::util::test_util::parquet_test_data();
4956        // message my_record {
4957        //     REQUIRED group a (LIST) {
4958        //         REPEATED group array (LIST) {
4959        //             REPEATED INT32 array;
4960        //         }
4961        //     }
4962        // }
4963        // should be read as list<list<int32>>
4964        let path = format!("{testdata}/old_list_structure.parquet");
4965        let test_file = File::open(path).unwrap();
4966
4967        // create expected ListArray
4968        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4969
4970        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4971        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4972
4973        // Construct a list array from the above two
4974        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4975            "array",
4976            DataType::Int32,
4977            false,
4978        ))))
4979        .len(2)
4980        .add_buffer(a_value_offsets)
4981        .add_child_data(a_values.into_data())
4982        .build()
4983        .unwrap();
4984        let a = ListArray::from(a_list_data);
4985
4986        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4987        let mut reader = builder.build().unwrap();
4988        let out = reader.next().unwrap().unwrap();
4989        assert_eq!(out.num_rows(), 1);
4990        assert_eq!(out.num_columns(), 1);
4991        // grab first column
4992        let c0 = out.column(0);
4993        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4994        // get first row: [[1, 2], [3, 4]]
4995        let r0 = c0arr.value(0);
4996        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4997        assert_eq!(r0arr, &a);
4998    }
4999
5000    #[test]
5001    fn test_map_no_value() {
5002        // File schema:
5003        // message schema {
5004        //   required group my_map (MAP) {
5005        //     repeated group key_value {
5006        //       required int32 key;
5007        //       optional int32 value;
5008        //     }
5009        //   }
5010        //   required group my_map_no_v (MAP) {
5011        //     repeated group key_value {
5012        //       required int32 key;
5013        //     }
5014        //   }
5015        //   required group my_list (LIST) {
5016        //     repeated group list {
5017        //       required int32 element;
5018        //     }
5019        //   }
5020        // }
5021        let testdata = arrow::util::test_util::parquet_test_data();
5022        let path = format!("{testdata}/map_no_value.parquet");
5023        let file = File::open(path).unwrap();
5024
5025        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5026            .unwrap()
5027            .build()
5028            .unwrap();
5029        let out = reader.next().unwrap().unwrap();
5030        assert_eq!(out.num_rows(), 3);
5031        assert_eq!(out.num_columns(), 3);
5032        // my_map_no_v and my_list columns should now be equivalent
5033        let c0 = out.column(1).as_list::<i32>();
5034        let c1 = out.column(2).as_list::<i32>();
5035        assert_eq!(c0.len(), c1.len());
5036        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5037    }
5038
5039    #[test]
5040    fn test_get_row_group_column_bloom_filter_with_length() {
5041        // convert to new parquet file with bloom_filter_length
5042        let testdata = arrow::util::test_util::parquet_test_data();
5043        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5044        let file = File::open(path).unwrap();
5045        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5046        let schema = builder.schema().clone();
5047        let reader = builder.build().unwrap();
5048
5049        let mut parquet_data = Vec::new();
5050        let props = WriterProperties::builder()
5051            .set_bloom_filter_enabled(true)
5052            .build();
5053        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5054        for batch in reader {
5055            let batch = batch.unwrap();
5056            writer.write(&batch).unwrap();
5057        }
5058        writer.close().unwrap();
5059
5060        // test the new parquet file
5061        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5062    }
5063
5064    #[test]
5065    fn test_get_row_group_column_bloom_filter_without_length() {
5066        let testdata = arrow::util::test_util::parquet_test_data();
5067        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5068        let data = Bytes::from(std::fs::read(path).unwrap());
5069        test_get_row_group_column_bloom_filter(data, false);
5070    }
5071
5072    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5073        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5074
5075        let metadata = builder.metadata();
5076        assert_eq!(metadata.num_row_groups(), 1);
5077        let row_group = metadata.row_group(0);
5078        let column = row_group.column(0);
5079        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5080
5081        let sbbf = builder
5082            .get_row_group_column_bloom_filter(0, 0)
5083            .unwrap()
5084            .unwrap();
5085        assert!(sbbf.check(&"Hello"));
5086        assert!(!sbbf.check(&"Hello_Not_Exists"));
5087    }
5088
5089    #[test]
5090    fn test_read_unknown_logical_type() {
5091        let testdata = arrow::util::test_util::parquet_test_data();
5092        let path = format!("{testdata}/unknown-logical-type.parquet");
5093        let test_file = File::open(path).unwrap();
5094
5095        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5096            .expect("Error creating reader builder");
5097
5098        let schema = builder.metadata().file_metadata().schema_descr();
5099        assert_eq!(
5100            schema.column(0).logical_type_ref(),
5101            Some(&LogicalType::String)
5102        );
5103        assert_eq!(
5104            schema.column(1).logical_type_ref(),
5105            Some(&LogicalType::_Unknown { field_id: 2555 })
5106        );
5107        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5108
5109        let mut reader = builder.build().unwrap();
5110        let out = reader.next().unwrap().unwrap();
5111        assert_eq!(out.num_rows(), 3);
5112        assert_eq!(out.num_columns(), 2);
5113    }
5114}