parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
42use crate::file::reader::{ChunkReader, SerializedPageReader};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54/// Builder for constructing Parquet readers that decode into [Apache Arrow]
55/// arrays.
56///
57/// Most users should use one of the following specializations:
58///
59/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
60/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61/// * decoder API: [`ParquetDecoderBuilder::new`]
62///
63/// # Features
64/// * Projection pushdown: [`Self::with_projection`]
65/// * Cached metadata: [`ArrowReaderMetadata::load`]
66/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
67/// * Row group filtering: [`Self::with_row_groups`]
68/// * Range filtering: [`Self::with_row_selection`]
69/// * Row level filtering: [`Self::with_row_filter`]
70///
71/// # Implementing Predicate Pushdown
72///
73/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
74/// process, which is efficient and allows the most low level optimizations.
75///
76/// However, most Parquet based systems will apply filters at many steps prior
77/// to decoding such as pruning files, row groups and data pages. This crate
78/// provides the low level APIs needed to implement such filtering, but does not
79/// include any logic to actually evaluate predicates. For example:
80///
81/// * [`Self::with_row_groups`] for Row Group pruning
82/// * [`Self::with_row_selection`] for data page pruning
83/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
84///
85/// The rationale for this design is that implementing predicate pushdown is a
86/// complex topic and varies significantly from system to system. For example
87///
88/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
89/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
90/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
91/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
92///
93/// You can read more about this design in the [Querying Parquet with
94/// Millisecond Latency] Arrow blog post.
95///
96/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
97/// [`ParquetDecoderBuilder::new`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
98/// [Apache Arrow]: https://arrow.apache.org/
99/// [`StatisticsConverter`]: statistics::StatisticsConverter
100/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
101pub struct ArrowReaderBuilder<T> {
102    pub(crate) input: T,
103
104    pub(crate) metadata: Arc<ParquetMetaData>,
105
106    pub(crate) schema: SchemaRef,
107
108    pub(crate) fields: Option<Arc<ParquetField>>,
109
110    pub(crate) batch_size: usize,
111
112    pub(crate) row_groups: Option<Vec<usize>>,
113
114    pub(crate) projection: ProjectionMask,
115
116    pub(crate) filter: Option<RowFilter>,
117
118    pub(crate) selection: Option<RowSelection>,
119
120    pub(crate) limit: Option<usize>,
121
122    pub(crate) offset: Option<usize>,
123
124    pub(crate) metrics: ArrowReaderMetrics,
125
126    pub(crate) max_predicate_cache_size: usize,
127}
128
129impl<T: Debug> Debug for ArrowReaderBuilder<T> {
130    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("ArrowReaderBuilder<T>")
132            .field("input", &self.input)
133            .field("metadata", &self.metadata)
134            .field("schema", &self.schema)
135            .field("fields", &self.fields)
136            .field("batch_size", &self.batch_size)
137            .field("row_groups", &self.row_groups)
138            .field("projection", &self.projection)
139            .field("filter", &self.filter)
140            .field("selection", &self.selection)
141            .field("limit", &self.limit)
142            .field("offset", &self.offset)
143            .field("metrics", &self.metrics)
144            .finish()
145    }
146}
147
148impl<T> ArrowReaderBuilder<T> {
149    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
150        Self {
151            input,
152            metadata: metadata.metadata,
153            schema: metadata.schema,
154            fields: metadata.fields,
155            batch_size: 1024,
156            row_groups: None,
157            projection: ProjectionMask::all(),
158            filter: None,
159            selection: None,
160            limit: None,
161            offset: None,
162            metrics: ArrowReaderMetrics::Disabled,
163            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
164        }
165    }
166
167    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
168    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
169        &self.metadata
170    }
171
172    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
173    pub fn parquet_schema(&self) -> &SchemaDescriptor {
174        self.metadata.file_metadata().schema_descr()
175    }
176
177    /// Returns the arrow [`SchemaRef`] for this parquet file
178    pub fn schema(&self) -> &SchemaRef {
179        &self.schema
180    }
181
182    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
183    /// If the batch_size more than the file row count, use the file row count.
184    pub fn with_batch_size(self, batch_size: usize) -> Self {
185        // Try to avoid allocate large buffer
186        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
187        Self { batch_size, ..self }
188    }
189
190    /// Only read data from the provided row group indexes
191    ///
192    /// This is also called row group filtering
193    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
194        Self {
195            row_groups: Some(row_groups),
196            ..self
197        }
198    }
199
200    /// Only read data from the provided column indexes
201    pub fn with_projection(self, mask: ProjectionMask) -> Self {
202        Self {
203            projection: mask,
204            ..self
205        }
206    }
207
208    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
209    /// data into memory.
210    ///
211    /// This feature is used to restrict which rows are decoded within row
212    /// groups, skipping ranges of rows that are not needed. Such selections
213    /// could be determined by evaluating predicates against the parquet page
214    /// [`Index`] or some other external information available to a query
215    /// engine.
216    ///
217    /// # Notes
218    ///
219    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
220    /// applying the row selection, and therefore rows from skipped row groups
221    /// should not be included in the [`RowSelection`] (see example below)
222    ///
223    /// It is recommended to enable writing the page index if using this
224    /// functionality, to allow more efficient skipping over data pages. See
225    /// [`ArrowReaderOptions::with_page_index`].
226    ///
227    /// # Example
228    ///
229    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
230    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
231    /// row group 3:
232    ///
233    /// ```text
234    ///   Row Group 0, 1000 rows (selected)
235    ///   Row Group 1, 1000 rows (skipped)
236    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
237    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
238    /// ```
239    ///
240    /// You could pass the following [`RowSelection`]:
241    ///
242    /// ```text
243    ///  Select 1000    (scan all rows in row group 0)
244    ///  Skip 50        (skip the first 50 rows in row group 2)
245    ///  Select 50      (scan rows 50-100 in row group 2)
246    ///  Skip 900       (skip the remaining rows in row group 2)
247    ///  Skip 200       (skip the first 200 rows in row group 3)
248    ///  Select 100     (scan rows 200-300 in row group 3)
249    ///  Skip 700       (skip the remaining rows in row group 3)
250    /// ```
251    /// Note there is no entry for the (entirely) skipped row group 1.
252    ///
253    /// Note you can represent the same selection with fewer entries. Instead of
254    ///
255    /// ```text
256    ///  Skip 900       (skip the remaining rows in row group 2)
257    ///  Skip 200       (skip the first 200 rows in row group 3)
258    /// ```
259    ///
260    /// you could use
261    ///
262    /// ```text
263    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
264    /// ```
265    ///
266    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
267    pub fn with_row_selection(self, selection: RowSelection) -> Self {
268        Self {
269            selection: Some(selection),
270            ..self
271        }
272    }
273
274    /// Provide a [`RowFilter`] to skip decoding rows
275    ///
276    /// Row filters are applied after row group selection and row selection
277    ///
278    /// It is recommended to enable reading the page index if using this functionality, to allow
279    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
280    pub fn with_row_filter(self, filter: RowFilter) -> Self {
281        Self {
282            filter: Some(filter),
283            ..self
284        }
285    }
286
287    /// Provide a limit to the number of rows to be read
288    ///
289    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
290    /// allowing it to limit the final set of rows decoded after any pushed down predicates
291    ///
292    /// It is recommended to enable reading the page index if using this functionality, to allow
293    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
294    pub fn with_limit(self, limit: usize) -> Self {
295        Self {
296            limit: Some(limit),
297            ..self
298        }
299    }
300
301    /// Provide an offset to skip over the given number of rows
302    ///
303    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
304    /// allowing it to skip rows after any pushed down predicates
305    ///
306    /// It is recommended to enable reading the page index if using this functionality, to allow
307    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
308    pub fn with_offset(self, offset: usize) -> Self {
309        Self {
310            offset: Some(offset),
311            ..self
312        }
313    }
314
315    /// Specify metrics collection during reading
316    ///
317    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
318    /// clone of the provided metrics to the builder.
319    ///
320    /// For example:
321    ///
322    /// ```rust
323    /// # use std::sync::Arc;
324    /// # use bytes::Bytes;
325    /// # use arrow_array::{Int32Array, RecordBatch};
326    /// # use arrow_schema::{DataType, Field, Schema};
327    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
328    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
329    /// # use parquet::arrow::ArrowWriter;
330    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
331    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
332    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
333    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
334    /// # writer.write(&batch).unwrap();
335    /// # writer.close().unwrap();
336    /// # let file = Bytes::from(file);
337    /// // Create metrics object to pass into the reader
338    /// let metrics = ArrowReaderMetrics::enabled();
339    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
340    ///   // Configure the builder to use the metrics by passing a clone
341    ///   .with_metrics(metrics.clone())
342    ///   // Build the reader
343    ///   .build().unwrap();
344    /// // .. read data from the reader ..
345    ///
346    /// // check the metrics
347    /// assert!(metrics.records_read_from_inner().is_some());
348    /// ```
349    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
350        Self { metrics, ..self }
351    }
352
353    /// Set the maximum size (per row group) of the predicate cache in bytes for
354    /// the async decoder.
355    ///
356    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
357    /// unlimited cache size.
358    ///
359    /// This cache is used to store decoded arrays that are used in
360    /// predicate evaluation ([`Self::with_row_filter`]).
361    ///
362    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
363    /// [this ticket] for more details and alternatives.
364    ///
365    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
366    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
367    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
368        Self {
369            max_predicate_cache_size,
370            ..self
371        }
372    }
373}
374
375/// Options that control how metadata is read for a parquet file
376///
377/// See [`ArrowReaderBuilder`] for how to configure how the column data
378/// is then read from the file, including projection and filter pushdown
379#[derive(Debug, Clone, Default)]
380pub struct ArrowReaderOptions {
381    /// Should the reader strip any user defined metadata from the Arrow schema
382    skip_arrow_metadata: bool,
383    /// If provided used as the schema hint when determining the Arrow schema,
384    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
385    ///
386    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
387    supplied_schema: Option<SchemaRef>,
388    /// Policy for reading offset and column indexes.
389    pub(crate) page_index_policy: PageIndexPolicy,
390    /// If encryption is enabled, the file decryption properties can be provided
391    #[cfg(feature = "encryption")]
392    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
393}
394
395impl ArrowReaderOptions {
396    /// Create a new [`ArrowReaderOptions`] with the default settings
397    pub fn new() -> Self {
398        Self::default()
399    }
400
401    /// Skip decoding the embedded arrow metadata (defaults to `false`)
402    ///
403    /// Parquet files generated by some writers may contain embedded arrow
404    /// schema and metadata.
405    /// This may not be correct or compatible with your system,
406    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
407    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
408        Self {
409            skip_arrow_metadata,
410            ..self
411        }
412    }
413
414    /// Provide a schema hint to use when reading the Parquet file.
415    ///
416    /// If provided, this schema takes precedence over any arrow schema embedded
417    /// in the metadata (see the [`arrow`] documentation for more details).
418    ///
419    /// If the provided schema is not compatible with the data stored in the
420    /// parquet file schema, an error will be returned when constructing the
421    /// builder.
422    ///
423    /// This option is only required if you want to explicitly control the
424    /// conversion of Parquet types to Arrow types, such as casting a column to
425    /// a different type. For example, if you wanted to read an Int64 in
426    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
427    ///
428    /// [`arrow`]: crate::arrow
429    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
430    ///
431    /// # Notes
432    ///
433    /// The provided schema must have the same number of columns as the parquet schema and
434    /// the column names must be the same.
435    ///
436    /// # Example
437    /// ```
438    /// use std::io::Bytes;
439    /// use std::sync::Arc;
440    /// use tempfile::tempfile;
441    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
442    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
443    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
444    /// use parquet::arrow::ArrowWriter;
445    ///
446    /// // Write data - schema is inferred from the data to be Int32
447    /// let file = tempfile().unwrap();
448    /// let batch = RecordBatch::try_from_iter(vec![
449    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
450    /// ]).unwrap();
451    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
452    /// writer.write(&batch).unwrap();
453    /// writer.close().unwrap();
454    ///
455    /// // Read the file back.
456    /// // Supply a schema that interprets the Int32 column as a Timestamp.
457    /// let supplied_schema = Arc::new(Schema::new(vec![
458    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
459    /// ]));
460    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
461    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
462    ///     file.try_clone().unwrap(),
463    ///     options
464    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
465    ///
466    /// // Create the reader and read the data using the supplied schema.
467    /// let mut reader = builder.build().unwrap();
468    /// let _batch = reader.next().unwrap().unwrap();
469    /// ```
470    pub fn with_schema(self, schema: SchemaRef) -> Self {
471        Self {
472            supplied_schema: Some(schema),
473            skip_arrow_metadata: true,
474            ..self
475        }
476    }
477
478    /// Enable reading [`PageIndex`], if present (defaults to `false`)
479    ///
480    /// The `PageIndex` can be used to push down predicates to the parquet scan,
481    /// potentially eliminating unnecessary IO, by some query engines.
482    ///
483    /// If this is enabled, [`ParquetMetaData::column_index`] and
484    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
485    /// information is present in the file.
486    ///
487    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
488    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
489    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
490    pub fn with_page_index(self, page_index: bool) -> Self {
491        let page_index_policy = PageIndexPolicy::from(page_index);
492
493        Self {
494            page_index_policy,
495            ..self
496        }
497    }
498
499    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
500    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
501        Self {
502            page_index_policy: policy,
503            ..self
504        }
505    }
506
507    /// Provide the file decryption properties to use when reading encrypted parquet files.
508    ///
509    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
510    #[cfg(feature = "encryption")]
511    pub fn with_file_decryption_properties(
512        self,
513        file_decryption_properties: Arc<FileDecryptionProperties>,
514    ) -> Self {
515        Self {
516            file_decryption_properties: Some(file_decryption_properties),
517            ..self
518        }
519    }
520
521    /// Retrieve the currently set page index behavior.
522    ///
523    /// This can be set via [`with_page_index`][Self::with_page_index].
524    pub fn page_index(&self) -> bool {
525        self.page_index_policy != PageIndexPolicy::Skip
526    }
527
528    /// Retrieve the currently set file decryption properties.
529    ///
530    /// This can be set via
531    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
532    #[cfg(feature = "encryption")]
533    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
534        self.file_decryption_properties.as_ref()
535    }
536}
537
538/// The metadata necessary to construct a [`ArrowReaderBuilder`]
539///
540/// Note this structure is cheaply clone-able as it consists of several arcs.
541///
542/// This structure allows
543///
544/// 1. Loading metadata for a file once and then using that same metadata to
545///    construct multiple separate readers, for example, to distribute readers
546///    across multiple threads
547///
548/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
549///    from the file each time a reader is constructed.
550///
551/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
552#[derive(Debug, Clone)]
553pub struct ArrowReaderMetadata {
554    /// The Parquet Metadata, if known aprior
555    pub(crate) metadata: Arc<ParquetMetaData>,
556    /// The Arrow Schema
557    pub(crate) schema: SchemaRef,
558
559    pub(crate) fields: Option<Arc<ParquetField>>,
560}
561
562impl ArrowReaderMetadata {
563    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
564    ///
565    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
566    /// example of how this can be used
567    ///
568    /// # Notes
569    ///
570    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
571    /// `Self::metadata` is missing the page index, this function will attempt
572    /// to load the page index by making an object store request.
573    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
574        let metadata =
575            ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
576        #[cfg(feature = "encryption")]
577        let metadata = metadata.with_decryption_properties(
578            options.file_decryption_properties.as_ref().map(Arc::clone),
579        );
580        let metadata = metadata.parse_and_finish(reader)?;
581        Self::try_new(Arc::new(metadata), options)
582    }
583
584    /// Create a new [`ArrowReaderMetadata`]
585    ///
586    /// # Notes
587    ///
588    /// This function does not attempt to load the PageIndex if not present in the metadata.
589    /// See [`Self::load`] for more details.
590    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
591        match options.supplied_schema {
592            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
593            None => {
594                let kv_metadata = match options.skip_arrow_metadata {
595                    true => None,
596                    false => metadata.file_metadata().key_value_metadata(),
597                };
598
599                let (schema, fields) = parquet_to_arrow_schema_and_fields(
600                    metadata.file_metadata().schema_descr(),
601                    ProjectionMask::all(),
602                    kv_metadata,
603                )?;
604
605                Ok(Self {
606                    metadata,
607                    schema: Arc::new(schema),
608                    fields: fields.map(Arc::new),
609                })
610            }
611        }
612    }
613
614    fn with_supplied_schema(
615        metadata: Arc<ParquetMetaData>,
616        supplied_schema: SchemaRef,
617    ) -> Result<Self> {
618        let parquet_schema = metadata.file_metadata().schema_descr();
619        let field_levels = parquet_to_arrow_field_levels(
620            parquet_schema,
621            ProjectionMask::all(),
622            Some(supplied_schema.fields()),
623        )?;
624        let fields = field_levels.fields;
625        let inferred_len = fields.len();
626        let supplied_len = supplied_schema.fields().len();
627        // Ensure the supplied schema has the same number of columns as the parquet schema.
628        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
629        // different lengths, but we check here to be safe.
630        if inferred_len != supplied_len {
631            return Err(arrow_err!(format!(
632                "Incompatible supplied Arrow schema: expected {} columns received {}",
633                inferred_len, supplied_len
634            )));
635        }
636
637        let mut errors = Vec::new();
638
639        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
640
641        for (field1, field2) in field_iter {
642            if field1.data_type() != field2.data_type() {
643                errors.push(format!(
644                    "data type mismatch for field {}: requested {} but found {}",
645                    field1.name(),
646                    field1.data_type(),
647                    field2.data_type()
648                ));
649            }
650            if field1.is_nullable() != field2.is_nullable() {
651                errors.push(format!(
652                    "nullability mismatch for field {}: expected {:?} but found {:?}",
653                    field1.name(),
654                    field1.is_nullable(),
655                    field2.is_nullable()
656                ));
657            }
658            if field1.metadata() != field2.metadata() {
659                errors.push(format!(
660                    "metadata mismatch for field {}: expected {:?} but found {:?}",
661                    field1.name(),
662                    field1.metadata(),
663                    field2.metadata()
664                ));
665            }
666        }
667
668        if !errors.is_empty() {
669            let message = errors.join(", ");
670            return Err(ParquetError::ArrowError(format!(
671                "Incompatible supplied Arrow schema: {message}",
672            )));
673        }
674
675        Ok(Self {
676            metadata,
677            schema: supplied_schema,
678            fields: field_levels.levels.map(Arc::new),
679        })
680    }
681
682    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
683    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
684        &self.metadata
685    }
686
687    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
688    pub fn parquet_schema(&self) -> &SchemaDescriptor {
689        self.metadata.file_metadata().schema_descr()
690    }
691
692    /// Returns the arrow [`SchemaRef`] for this parquet file
693    pub fn schema(&self) -> &SchemaRef {
694        &self.schema
695    }
696}
697
698#[doc(hidden)]
699// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
700pub struct SyncReader<T: ChunkReader>(T);
701
702impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
703    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
704        f.debug_tuple("SyncReader").field(&self.0).finish()
705    }
706}
707
708/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
709///
710/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
711///
712/// See [`ArrowReaderBuilder`] for additional member functions
713pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
714
715impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
716    /// Create a new [`ParquetRecordBatchReaderBuilder`]
717    ///
718    /// ```
719    /// # use std::sync::Arc;
720    /// # use bytes::Bytes;
721    /// # use arrow_array::{Int32Array, RecordBatch};
722    /// # use arrow_schema::{DataType, Field, Schema};
723    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
724    /// # use parquet::arrow::ArrowWriter;
725    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
726    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
727    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
728    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
729    /// # writer.write(&batch).unwrap();
730    /// # writer.close().unwrap();
731    /// # let file = Bytes::from(file);
732    /// #
733    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
734    ///
735    /// // Inspect metadata
736    /// assert_eq!(builder.metadata().num_row_groups(), 1);
737    ///
738    /// // Construct reader
739    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
740    ///
741    /// // Read data
742    /// let _batch = reader.next().unwrap().unwrap();
743    /// ```
744    pub fn try_new(reader: T) -> Result<Self> {
745        Self::try_new_with_options(reader, Default::default())
746    }
747
748    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
749    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
750        let metadata = ArrowReaderMetadata::load(&reader, options)?;
751        Ok(Self::new_with_metadata(reader, metadata))
752    }
753
754    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
755    ///
756    /// This interface allows:
757    ///
758    /// 1. Loading metadata once and using it to create multiple builders with
759    ///    potentially different settings or run on different threads
760    ///
761    /// 2. Using a cached copy of the metadata rather than re-reading it from the
762    ///    file each time a reader is constructed.
763    ///
764    /// See the docs on [`ArrowReaderMetadata`] for more details
765    ///
766    /// # Example
767    /// ```
768    /// # use std::fs::metadata;
769    /// # use std::sync::Arc;
770    /// # use bytes::Bytes;
771    /// # use arrow_array::{Int32Array, RecordBatch};
772    /// # use arrow_schema::{DataType, Field, Schema};
773    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
774    /// # use parquet::arrow::ArrowWriter;
775    /// #
776    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
777    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
778    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
779    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
780    /// # writer.write(&batch).unwrap();
781    /// # writer.close().unwrap();
782    /// # let file = Bytes::from(file);
783    /// #
784    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
785    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
786    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
787    ///
788    /// // Should be able to read from both in parallel
789    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
790    /// ```
791    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
792        Self::new_builder(SyncReader(input), metadata)
793    }
794
795    /// Read bloom filter for a column in a row group
796    ///
797    /// Returns `None` if the column does not have a bloom filter
798    ///
799    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
800    pub fn get_row_group_column_bloom_filter(
801        &self,
802        row_group_idx: usize,
803        column_idx: usize,
804    ) -> Result<Option<Sbbf>> {
805        let metadata = self.metadata.row_group(row_group_idx);
806        let column_metadata = metadata.column(column_idx);
807
808        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
809            offset
810                .try_into()
811                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
812        } else {
813            return Ok(None);
814        };
815
816        let buffer = match column_metadata.bloom_filter_length() {
817            Some(length) => self.input.0.get_bytes(offset, length as usize),
818            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
819        }?;
820
821        let (header, bitset_offset) =
822            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
823
824        match header.algorithm {
825            BloomFilterAlgorithm::BLOCK => {
826                // this match exists to future proof the singleton algorithm enum
827            }
828        }
829        match header.compression {
830            BloomFilterCompression::UNCOMPRESSED => {
831                // this match exists to future proof the singleton compression enum
832            }
833        }
834        match header.hash {
835            BloomFilterHash::XXHASH => {
836                // this match exists to future proof the singleton hash enum
837            }
838        }
839
840        let bitset = match column_metadata.bloom_filter_length() {
841            Some(_) => buffer.slice(
842                (TryInto::<usize>::try_into(bitset_offset).unwrap()
843                    - TryInto::<usize>::try_into(offset).unwrap())..,
844            ),
845            None => {
846                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
847                    ParquetError::General("Bloom filter length is invalid".to_string())
848                })?;
849                self.input.0.get_bytes(bitset_offset, bitset_length)?
850            }
851        };
852        Ok(Some(Sbbf::new(&bitset)))
853    }
854
855    /// Build a [`ParquetRecordBatchReader`]
856    ///
857    /// Note: this will eagerly evaluate any `RowFilter` before returning
858    pub fn build(self) -> Result<ParquetRecordBatchReader> {
859        let Self {
860            input,
861            metadata,
862            schema: _,
863            fields,
864            batch_size: _,
865            row_groups,
866            projection,
867            mut filter,
868            selection,
869            limit,
870            offset,
871            metrics,
872            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
873            max_predicate_cache_size: _,
874        } = self;
875
876        // Try to avoid allocate large buffer
877        let batch_size = self
878            .batch_size
879            .min(metadata.file_metadata().num_rows() as usize);
880
881        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
882
883        let reader = ReaderRowGroups {
884            reader: Arc::new(input.0),
885            metadata,
886            row_groups,
887        };
888
889        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
890
891        // Update selection based on any filters
892        if let Some(filter) = filter.as_mut() {
893            for predicate in filter.predicates.iter_mut() {
894                // break early if we have ruled out all rows
895                if !plan_builder.selects_any() {
896                    break;
897                }
898
899                let mut cache_projection = predicate.projection().clone();
900                cache_projection.intersect(&projection);
901
902                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
903                    .build_array_reader(fields.as_deref(), predicate.projection())?;
904
905                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
906            }
907        }
908
909        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
910            .build_array_reader(fields.as_deref(), &projection)?;
911
912        let read_plan = plan_builder
913            .limited(reader.num_rows())
914            .with_offset(offset)
915            .with_limit(limit)
916            .build_limited()
917            .build();
918
919        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
920    }
921}
922
923struct ReaderRowGroups<T: ChunkReader> {
924    reader: Arc<T>,
925
926    metadata: Arc<ParquetMetaData>,
927    /// Optional list of row group indices to scan
928    row_groups: Vec<usize>,
929}
930
931impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
932    fn num_rows(&self) -> usize {
933        let meta = self.metadata.row_groups();
934        self.row_groups
935            .iter()
936            .map(|x| meta[*x].num_rows() as usize)
937            .sum()
938    }
939
940    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
941        Ok(Box::new(ReaderPageIterator {
942            column_idx: i,
943            reader: self.reader.clone(),
944            metadata: self.metadata.clone(),
945            row_groups: self.row_groups.clone().into_iter(),
946        }))
947    }
948}
949
950struct ReaderPageIterator<T: ChunkReader> {
951    reader: Arc<T>,
952    column_idx: usize,
953    row_groups: std::vec::IntoIter<usize>,
954    metadata: Arc<ParquetMetaData>,
955}
956
957impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
958    /// Return the next SerializedPageReader
959    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
960        let rg = self.metadata.row_group(rg_idx);
961        let column_chunk_metadata = rg.column(self.column_idx);
962        let offset_index = self.metadata.offset_index();
963        // `offset_index` may not exist and `i[rg_idx]` will be empty.
964        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
965        let page_locations = offset_index
966            .filter(|i| !i[rg_idx].is_empty())
967            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
968        let total_rows = rg.num_rows() as usize;
969        let reader = self.reader.clone();
970
971        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
972            .add_crypto_context(
973                rg_idx,
974                self.column_idx,
975                self.metadata.as_ref(),
976                column_chunk_metadata,
977            )
978    }
979}
980
981impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
982    type Item = Result<Box<dyn PageReader>>;
983
984    fn next(&mut self) -> Option<Self::Item> {
985        let rg_idx = self.row_groups.next()?;
986        let page_reader = self
987            .next_page_reader(rg_idx)
988            .map(|page_reader| Box::new(page_reader) as _);
989        Some(page_reader)
990    }
991}
992
993impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
994
995/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
996/// read from a parquet data source
997///
998/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
999/// the buffered state (DataPages, etc) necessary to decode the parquet data into
1000/// Arrow arrays.
1001pub struct ParquetRecordBatchReader {
1002    array_reader: Box<dyn ArrayReader>,
1003    schema: SchemaRef,
1004    read_plan: ReadPlan,
1005}
1006
1007impl Debug for ParquetRecordBatchReader {
1008    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1009        f.debug_struct("ParquetRecordBatchReader")
1010            .field("array_reader", &"...")
1011            .field("schema", &self.schema)
1012            .field("read_plan", &self.read_plan)
1013            .finish()
1014    }
1015}
1016
1017impl Iterator for ParquetRecordBatchReader {
1018    type Item = Result<RecordBatch, ArrowError>;
1019
1020    fn next(&mut self) -> Option<Self::Item> {
1021        self.next_inner()
1022            .map_err(|arrow_err| arrow_err.into())
1023            .transpose()
1024    }
1025}
1026
1027impl ParquetRecordBatchReader {
1028    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1029    /// has reached the end of the file.
1030    ///
1031    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1032    /// simplify error handling with `?`
1033    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1034        let mut read_records = 0;
1035        let batch_size = self.batch_size();
1036        match self.read_plan.selection_mut() {
1037            Some(selection) => {
1038                while read_records < batch_size && !selection.is_empty() {
1039                    let front = selection.pop_front().unwrap();
1040                    if front.skip {
1041                        let skipped = self.array_reader.skip_records(front.row_count)?;
1042
1043                        if skipped != front.row_count {
1044                            return Err(general_err!(
1045                                "failed to skip rows, expected {}, got {}",
1046                                front.row_count,
1047                                skipped
1048                            ));
1049                        }
1050                        continue;
1051                    }
1052
1053                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1054                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1055                    if front.row_count == 0 {
1056                        continue;
1057                    }
1058
1059                    // try to read record
1060                    let need_read = batch_size - read_records;
1061                    let to_read = match front.row_count.checked_sub(need_read) {
1062                        Some(remaining) if remaining != 0 => {
1063                            // if page row count less than batch_size we must set batch size to page row count.
1064                            // add check avoid dead loop
1065                            selection.push_front(RowSelector::select(remaining));
1066                            need_read
1067                        }
1068                        _ => front.row_count,
1069                    };
1070                    match self.array_reader.read_records(to_read)? {
1071                        0 => break,
1072                        rec => read_records += rec,
1073                    };
1074                }
1075            }
1076            None => {
1077                self.array_reader.read_records(batch_size)?;
1078            }
1079        };
1080
1081        let array = self.array_reader.consume_batch()?;
1082        let struct_array = array.as_struct_opt().ok_or_else(|| {
1083            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1084        })?;
1085
1086        Ok(if struct_array.len() > 0 {
1087            Some(RecordBatch::from(struct_array))
1088        } else {
1089            None
1090        })
1091    }
1092}
1093
1094impl RecordBatchReader for ParquetRecordBatchReader {
1095    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1096    ///
1097    /// Note that the schema metadata will be stripped here. See
1098    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1099    fn schema(&self) -> SchemaRef {
1100        self.schema.clone()
1101    }
1102}
1103
1104impl ParquetRecordBatchReader {
1105    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1106    ///
1107    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1108    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1109        ParquetRecordBatchReaderBuilder::try_new(reader)?
1110            .with_batch_size(batch_size)
1111            .build()
1112    }
1113
1114    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1115    ///
1116    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1117    /// higher-level interface for reading parquet data from a file
1118    pub fn try_new_with_row_groups(
1119        levels: &FieldLevels,
1120        row_groups: &dyn RowGroups,
1121        batch_size: usize,
1122        selection: Option<RowSelection>,
1123    ) -> Result<Self> {
1124        // note metrics are not supported in this API
1125        let metrics = ArrowReaderMetrics::disabled();
1126        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1127            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1128
1129        let read_plan = ReadPlanBuilder::new(batch_size)
1130            .with_selection(selection)
1131            .build();
1132
1133        Ok(Self {
1134            array_reader,
1135            schema: Arc::new(Schema::new(levels.fields.clone())),
1136            read_plan,
1137        })
1138    }
1139
1140    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1141    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1142    /// all rows will be returned
1143    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1144        let schema = match array_reader.get_data_type() {
1145            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1146            _ => unreachable!("Struct array reader's data type is not struct!"),
1147        };
1148
1149        Self {
1150            array_reader,
1151            schema: Arc::new(schema),
1152            read_plan,
1153        }
1154    }
1155
1156    #[inline(always)]
1157    pub(crate) fn batch_size(&self) -> usize {
1158        self.read_plan.batch_size()
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use std::cmp::min;
1165    use std::collections::{HashMap, VecDeque};
1166    use std::fmt::Formatter;
1167    use std::fs::File;
1168    use std::io::Seek;
1169    use std::path::PathBuf;
1170    use std::sync::Arc;
1171
1172    use arrow_array::builder::*;
1173    use arrow_array::cast::AsArray;
1174    use arrow_array::types::{
1175        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1176        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1177        Time64MicrosecondType,
1178    };
1179    use arrow_array::*;
1180    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1181    use arrow_data::{ArrayData, ArrayDataBuilder};
1182    use arrow_schema::{
1183        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1184    };
1185    use arrow_select::concat::concat_batches;
1186    use bytes::Bytes;
1187    use half::f16;
1188    use num_traits::PrimInt;
1189    use rand::{Rng, RngCore, rng};
1190    use tempfile::tempfile;
1191
1192    use crate::arrow::arrow_reader::{
1193        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1194        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1195    };
1196    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1197    use crate::arrow::{ArrowWriter, ProjectionMask};
1198    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1199    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1200    use crate::data_type::{
1201        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1202        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1203    };
1204    use crate::errors::Result;
1205    use crate::file::metadata::ParquetMetaData;
1206    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1207    use crate::file::writer::SerializedFileWriter;
1208    use crate::schema::parser::parse_message_type;
1209    use crate::schema::types::{Type, TypePtr};
1210    use crate::util::test_common::rand_gen::RandGen;
1211
1212    #[test]
1213    fn test_arrow_reader_all_columns() {
1214        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1215
1216        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1217        let original_schema = Arc::clone(builder.schema());
1218        let reader = builder.build().unwrap();
1219
1220        // Verify that the schema was correctly parsed
1221        assert_eq!(original_schema.fields(), reader.schema().fields());
1222    }
1223
1224    #[test]
1225    fn test_arrow_reader_single_column() {
1226        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1227
1228        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1229        let original_schema = Arc::clone(builder.schema());
1230
1231        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1232        let reader = builder.with_projection(mask).build().unwrap();
1233
1234        // Verify that the schema was correctly parsed
1235        assert_eq!(1, reader.schema().fields().len());
1236        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1237    }
1238
1239    #[test]
1240    fn test_arrow_reader_single_column_by_name() {
1241        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1242
1243        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1244        let original_schema = Arc::clone(builder.schema());
1245
1246        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1247        let reader = builder.with_projection(mask).build().unwrap();
1248
1249        // Verify that the schema was correctly parsed
1250        assert_eq!(1, reader.schema().fields().len());
1251        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1252    }
1253
1254    #[test]
1255    fn test_null_column_reader_test() {
1256        let mut file = tempfile::tempfile().unwrap();
1257
1258        let schema = "
1259            message message {
1260                OPTIONAL INT32 int32;
1261            }
1262        ";
1263        let schema = Arc::new(parse_message_type(schema).unwrap());
1264
1265        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1266        generate_single_column_file_with_data::<Int32Type>(
1267            &[vec![], vec![]],
1268            Some(&def_levels),
1269            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1270            schema,
1271            Some(Field::new("int32", ArrowDataType::Null, true)),
1272            &Default::default(),
1273        )
1274        .unwrap();
1275
1276        file.rewind().unwrap();
1277
1278        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1279        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1280
1281        assert_eq!(batches.len(), 4);
1282        for batch in &batches[0..3] {
1283            assert_eq!(batch.num_rows(), 2);
1284            assert_eq!(batch.num_columns(), 1);
1285            assert_eq!(batch.column(0).null_count(), 2);
1286        }
1287
1288        assert_eq!(batches[3].num_rows(), 1);
1289        assert_eq!(batches[3].num_columns(), 1);
1290        assert_eq!(batches[3].column(0).null_count(), 1);
1291    }
1292
1293    #[test]
1294    fn test_primitive_single_column_reader_test() {
1295        run_single_column_reader_tests::<BoolType, _, BoolType>(
1296            2,
1297            ConvertedType::NONE,
1298            None,
1299            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1300            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1301        );
1302        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1303            2,
1304            ConvertedType::NONE,
1305            None,
1306            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1307            &[
1308                Encoding::PLAIN,
1309                Encoding::RLE_DICTIONARY,
1310                Encoding::DELTA_BINARY_PACKED,
1311                Encoding::BYTE_STREAM_SPLIT,
1312            ],
1313        );
1314        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1315            2,
1316            ConvertedType::NONE,
1317            None,
1318            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1319            &[
1320                Encoding::PLAIN,
1321                Encoding::RLE_DICTIONARY,
1322                Encoding::DELTA_BINARY_PACKED,
1323                Encoding::BYTE_STREAM_SPLIT,
1324            ],
1325        );
1326        run_single_column_reader_tests::<FloatType, _, FloatType>(
1327            2,
1328            ConvertedType::NONE,
1329            None,
1330            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1331            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1332        );
1333    }
1334
1335    #[test]
1336    fn test_unsigned_primitive_single_column_reader_test() {
1337        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1338            2,
1339            ConvertedType::UINT_32,
1340            Some(ArrowDataType::UInt32),
1341            |vals| {
1342                Arc::new(UInt32Array::from_iter(
1343                    vals.iter().map(|x| x.map(|x| x as u32)),
1344                ))
1345            },
1346            &[
1347                Encoding::PLAIN,
1348                Encoding::RLE_DICTIONARY,
1349                Encoding::DELTA_BINARY_PACKED,
1350            ],
1351        );
1352        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1353            2,
1354            ConvertedType::UINT_64,
1355            Some(ArrowDataType::UInt64),
1356            |vals| {
1357                Arc::new(UInt64Array::from_iter(
1358                    vals.iter().map(|x| x.map(|x| x as u64)),
1359                ))
1360            },
1361            &[
1362                Encoding::PLAIN,
1363                Encoding::RLE_DICTIONARY,
1364                Encoding::DELTA_BINARY_PACKED,
1365            ],
1366        );
1367    }
1368
1369    #[test]
1370    fn test_unsigned_roundtrip() {
1371        let schema = Arc::new(Schema::new(vec![
1372            Field::new("uint32", ArrowDataType::UInt32, true),
1373            Field::new("uint64", ArrowDataType::UInt64, true),
1374        ]));
1375
1376        let mut buf = Vec::with_capacity(1024);
1377        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1378
1379        let original = RecordBatch::try_new(
1380            schema,
1381            vec![
1382                Arc::new(UInt32Array::from_iter_values([
1383                    0,
1384                    i32::MAX as u32,
1385                    u32::MAX,
1386                ])),
1387                Arc::new(UInt64Array::from_iter_values([
1388                    0,
1389                    i64::MAX as u64,
1390                    u64::MAX,
1391                ])),
1392            ],
1393        )
1394        .unwrap();
1395
1396        writer.write(&original).unwrap();
1397        writer.close().unwrap();
1398
1399        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1400        let ret = reader.next().unwrap().unwrap();
1401        assert_eq!(ret, original);
1402
1403        // Check they can be downcast to the correct type
1404        ret.column(0)
1405            .as_any()
1406            .downcast_ref::<UInt32Array>()
1407            .unwrap();
1408
1409        ret.column(1)
1410            .as_any()
1411            .downcast_ref::<UInt64Array>()
1412            .unwrap();
1413    }
1414
1415    #[test]
1416    fn test_float16_roundtrip() -> Result<()> {
1417        let schema = Arc::new(Schema::new(vec![
1418            Field::new("float16", ArrowDataType::Float16, false),
1419            Field::new("float16-nullable", ArrowDataType::Float16, true),
1420        ]));
1421
1422        let mut buf = Vec::with_capacity(1024);
1423        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1424
1425        let original = RecordBatch::try_new(
1426            schema,
1427            vec![
1428                Arc::new(Float16Array::from_iter_values([
1429                    f16::EPSILON,
1430                    f16::MIN,
1431                    f16::MAX,
1432                    f16::NAN,
1433                    f16::INFINITY,
1434                    f16::NEG_INFINITY,
1435                    f16::ONE,
1436                    f16::NEG_ONE,
1437                    f16::ZERO,
1438                    f16::NEG_ZERO,
1439                    f16::E,
1440                    f16::PI,
1441                    f16::FRAC_1_PI,
1442                ])),
1443                Arc::new(Float16Array::from(vec![
1444                    None,
1445                    None,
1446                    None,
1447                    Some(f16::NAN),
1448                    Some(f16::INFINITY),
1449                    Some(f16::NEG_INFINITY),
1450                    None,
1451                    None,
1452                    None,
1453                    None,
1454                    None,
1455                    None,
1456                    Some(f16::FRAC_1_PI),
1457                ])),
1458            ],
1459        )?;
1460
1461        writer.write(&original)?;
1462        writer.close()?;
1463
1464        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1465        let ret = reader.next().unwrap()?;
1466        assert_eq!(ret, original);
1467
1468        // Ensure can be downcast to the correct type
1469        ret.column(0).as_primitive::<Float16Type>();
1470        ret.column(1).as_primitive::<Float16Type>();
1471
1472        Ok(())
1473    }
1474
1475    #[test]
1476    fn test_time_utc_roundtrip() -> Result<()> {
1477        let schema = Arc::new(Schema::new(vec![
1478            Field::new(
1479                "time_millis",
1480                ArrowDataType::Time32(TimeUnit::Millisecond),
1481                true,
1482            )
1483            .with_metadata(HashMap::from_iter(vec![(
1484                "adjusted_to_utc".to_string(),
1485                "".to_string(),
1486            )])),
1487            Field::new(
1488                "time_micros",
1489                ArrowDataType::Time64(TimeUnit::Microsecond),
1490                true,
1491            )
1492            .with_metadata(HashMap::from_iter(vec![(
1493                "adjusted_to_utc".to_string(),
1494                "".to_string(),
1495            )])),
1496        ]));
1497
1498        let mut buf = Vec::with_capacity(1024);
1499        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1500
1501        let original = RecordBatch::try_new(
1502            schema,
1503            vec![
1504                Arc::new(Time32MillisecondArray::from(vec![
1505                    Some(-1),
1506                    Some(0),
1507                    Some(86_399_000),
1508                    Some(86_400_000),
1509                    Some(86_401_000),
1510                    None,
1511                ])),
1512                Arc::new(Time64MicrosecondArray::from(vec![
1513                    Some(-1),
1514                    Some(0),
1515                    Some(86_399 * 1_000_000),
1516                    Some(86_400 * 1_000_000),
1517                    Some(86_401 * 1_000_000),
1518                    None,
1519                ])),
1520            ],
1521        )?;
1522
1523        writer.write(&original)?;
1524        writer.close()?;
1525
1526        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1527        let ret = reader.next().unwrap()?;
1528        assert_eq!(ret, original);
1529
1530        // Ensure can be downcast to the correct type
1531        ret.column(0).as_primitive::<Time32MillisecondType>();
1532        ret.column(1).as_primitive::<Time64MicrosecondType>();
1533
1534        Ok(())
1535    }
1536
1537    #[test]
1538    fn test_date32_roundtrip() -> Result<()> {
1539        use arrow_array::Date32Array;
1540
1541        let schema = Arc::new(Schema::new(vec![Field::new(
1542            "date32",
1543            ArrowDataType::Date32,
1544            false,
1545        )]));
1546
1547        let mut buf = Vec::with_capacity(1024);
1548
1549        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1550
1551        let original = RecordBatch::try_new(
1552            schema,
1553            vec![Arc::new(Date32Array::from(vec![
1554                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1555            ]))],
1556        )?;
1557
1558        writer.write(&original)?;
1559        writer.close()?;
1560
1561        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1562        let ret = reader.next().unwrap()?;
1563        assert_eq!(ret, original);
1564
1565        // Ensure can be downcast to the correct type
1566        ret.column(0).as_primitive::<Date32Type>();
1567
1568        Ok(())
1569    }
1570
1571    #[test]
1572    fn test_date64_roundtrip() -> Result<()> {
1573        use arrow_array::Date64Array;
1574
1575        let schema = Arc::new(Schema::new(vec![
1576            Field::new("small-date64", ArrowDataType::Date64, false),
1577            Field::new("big-date64", ArrowDataType::Date64, false),
1578            Field::new("invalid-date64", ArrowDataType::Date64, false),
1579        ]));
1580
1581        let mut default_buf = Vec::with_capacity(1024);
1582        let mut coerce_buf = Vec::with_capacity(1024);
1583
1584        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1585
1586        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1587        let mut coerce_writer =
1588            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1589
1590        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1591
1592        let original = RecordBatch::try_new(
1593            schema,
1594            vec![
1595                // small-date64
1596                Arc::new(Date64Array::from(vec![
1597                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1598                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1599                    0,
1600                    1_000 * NUM_MILLISECONDS_IN_DAY,
1601                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1602                ])),
1603                // big-date64
1604                Arc::new(Date64Array::from(vec![
1605                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1606                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1607                    0,
1608                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1609                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1610                ])),
1611                // invalid-date64
1612                Arc::new(Date64Array::from(vec![
1613                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1614                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1615                    1,
1616                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1617                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1618                ])),
1619            ],
1620        )?;
1621
1622        default_writer.write(&original)?;
1623        coerce_writer.write(&original)?;
1624
1625        default_writer.close()?;
1626        coerce_writer.close()?;
1627
1628        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1629        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1630
1631        let default_ret = default_reader.next().unwrap()?;
1632        let coerce_ret = coerce_reader.next().unwrap()?;
1633
1634        // Roundtrip should be successful when default writer used
1635        assert_eq!(default_ret, original);
1636
1637        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1638        assert_eq!(coerce_ret.column(0), original.column(0));
1639        assert_ne!(coerce_ret.column(1), original.column(1));
1640        assert_ne!(coerce_ret.column(2), original.column(2));
1641
1642        // Ensure both can be downcast to the correct type
1643        default_ret.column(0).as_primitive::<Date64Type>();
1644        coerce_ret.column(0).as_primitive::<Date64Type>();
1645
1646        Ok(())
1647    }
1648    struct RandFixedLenGen {}
1649
1650    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1651        fn r#gen(len: i32) -> FixedLenByteArray {
1652            let mut v = vec![0u8; len as usize];
1653            rng().fill_bytes(&mut v);
1654            ByteArray::from(v).into()
1655        }
1656    }
1657
1658    #[test]
1659    fn test_fixed_length_binary_column_reader() {
1660        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1661            20,
1662            ConvertedType::NONE,
1663            None,
1664            |vals| {
1665                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1666                for val in vals {
1667                    match val {
1668                        Some(b) => builder.append_value(b).unwrap(),
1669                        None => builder.append_null(),
1670                    }
1671                }
1672                Arc::new(builder.finish())
1673            },
1674            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1675        );
1676    }
1677
1678    #[test]
1679    fn test_interval_day_time_column_reader() {
1680        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1681            12,
1682            ConvertedType::INTERVAL,
1683            None,
1684            |vals| {
1685                Arc::new(
1686                    vals.iter()
1687                        .map(|x| {
1688                            x.as_ref().map(|b| IntervalDayTime {
1689                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1690                                milliseconds: i32::from_le_bytes(
1691                                    b.as_ref()[8..12].try_into().unwrap(),
1692                                ),
1693                            })
1694                        })
1695                        .collect::<IntervalDayTimeArray>(),
1696                )
1697            },
1698            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1699        );
1700    }
1701
1702    #[test]
1703    fn test_int96_single_column_reader_test() {
1704        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1705
1706        type TypeHintAndConversionFunction =
1707            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1708
1709        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1710            // Test without a specified ArrowType hint.
1711            (None, |vals: &[Option<Int96>]| {
1712                Arc::new(TimestampNanosecondArray::from_iter(
1713                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1714                )) as ArrayRef
1715            }),
1716            // Test other TimeUnits as ArrowType hints.
1717            (
1718                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1719                |vals: &[Option<Int96>]| {
1720                    Arc::new(TimestampSecondArray::from_iter(
1721                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1722                    )) as ArrayRef
1723                },
1724            ),
1725            (
1726                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1727                |vals: &[Option<Int96>]| {
1728                    Arc::new(TimestampMillisecondArray::from_iter(
1729                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1730                    )) as ArrayRef
1731                },
1732            ),
1733            (
1734                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1735                |vals: &[Option<Int96>]| {
1736                    Arc::new(TimestampMicrosecondArray::from_iter(
1737                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1738                    )) as ArrayRef
1739                },
1740            ),
1741            (
1742                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1743                |vals: &[Option<Int96>]| {
1744                    Arc::new(TimestampNanosecondArray::from_iter(
1745                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1746                    )) as ArrayRef
1747                },
1748            ),
1749            // Test another timezone with TimeUnit as ArrowType hints.
1750            (
1751                Some(ArrowDataType::Timestamp(
1752                    TimeUnit::Second,
1753                    Some(Arc::from("-05:00")),
1754                )),
1755                |vals: &[Option<Int96>]| {
1756                    Arc::new(
1757                        TimestampSecondArray::from_iter(
1758                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
1759                        )
1760                        .with_timezone("-05:00"),
1761                    ) as ArrayRef
1762                },
1763            ),
1764        ];
1765
1766        resolutions.iter().for_each(|(arrow_type, converter)| {
1767            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1768                2,
1769                ConvertedType::NONE,
1770                arrow_type.clone(),
1771                converter,
1772                encodings,
1773            );
1774        })
1775    }
1776
1777    #[test]
1778    fn test_int96_from_spark_file_with_provided_schema() {
1779        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1780        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
1781        // microsecond resolution for the Parquet reader to interpret these values correctly.
1782        use arrow_schema::DataType::Timestamp;
1783        let test_data = arrow::util::test_util::parquet_test_data();
1784        let path = format!("{test_data}/int96_from_spark.parquet");
1785        let file = File::open(path).unwrap();
1786
1787        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1788            "a",
1789            Timestamp(TimeUnit::Microsecond, None),
1790            true,
1791        )]));
1792        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1793
1794        let mut record_reader =
1795            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1796                .unwrap()
1797                .build()
1798                .unwrap();
1799
1800        let batch = record_reader.next().unwrap().unwrap();
1801        assert_eq!(batch.num_columns(), 1);
1802        let column = batch.column(0);
1803        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1804
1805        let expected = Arc::new(Int64Array::from(vec![
1806            Some(1704141296123456),
1807            Some(1704070800000000),
1808            Some(253402225200000000),
1809            Some(1735599600000000),
1810            None,
1811            Some(9089380393200000000),
1812        ]));
1813
1814        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1815        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1816        // anyway, so this should be a zero-copy non-modifying cast.
1817
1818        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1819        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1820
1821        assert_eq!(casted_timestamps.len(), expected.len());
1822
1823        casted_timestamps
1824            .iter()
1825            .zip(expected.iter())
1826            .for_each(|(lhs, rhs)| {
1827                assert_eq!(lhs, rhs);
1828            });
1829    }
1830
1831    #[test]
1832    fn test_int96_from_spark_file_without_provided_schema() {
1833        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1834        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1835        // values when read as nanosecond resolution overflow and result in garbage values.
1836        use arrow_schema::DataType::Timestamp;
1837        let test_data = arrow::util::test_util::parquet_test_data();
1838        let path = format!("{test_data}/int96_from_spark.parquet");
1839        let file = File::open(path).unwrap();
1840
1841        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1842            .unwrap()
1843            .build()
1844            .unwrap();
1845
1846        let batch = record_reader.next().unwrap().unwrap();
1847        assert_eq!(batch.num_columns(), 1);
1848        let column = batch.column(0);
1849        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1850
1851        let expected = Arc::new(Int64Array::from(vec![
1852            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
1853            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1854            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1855            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
1856            None,
1857            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1858        ]));
1859
1860        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1861        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1862        // anyway, so this should be a zero-copy non-modifying cast.
1863
1864        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1865        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1866
1867        assert_eq!(casted_timestamps.len(), expected.len());
1868
1869        casted_timestamps
1870            .iter()
1871            .zip(expected.iter())
1872            .for_each(|(lhs, rhs)| {
1873                assert_eq!(lhs, rhs);
1874            });
1875    }
1876
1877    struct RandUtf8Gen {}
1878
1879    impl RandGen<ByteArrayType> for RandUtf8Gen {
1880        fn r#gen(len: i32) -> ByteArray {
1881            Int32Type::r#gen(len).to_string().as_str().into()
1882        }
1883    }
1884
1885    #[test]
1886    fn test_utf8_single_column_reader_test() {
1887        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1888            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1889                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1890            })))
1891        }
1892
1893        let encodings = &[
1894            Encoding::PLAIN,
1895            Encoding::RLE_DICTIONARY,
1896            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1897            Encoding::DELTA_BYTE_ARRAY,
1898        ];
1899
1900        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1901            2,
1902            ConvertedType::NONE,
1903            None,
1904            |vals| {
1905                Arc::new(BinaryArray::from_iter(
1906                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1907                ))
1908            },
1909            encodings,
1910        );
1911
1912        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1913            2,
1914            ConvertedType::UTF8,
1915            None,
1916            string_converter::<i32>,
1917            encodings,
1918        );
1919
1920        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1921            2,
1922            ConvertedType::UTF8,
1923            Some(ArrowDataType::Utf8),
1924            string_converter::<i32>,
1925            encodings,
1926        );
1927
1928        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1929            2,
1930            ConvertedType::UTF8,
1931            Some(ArrowDataType::LargeUtf8),
1932            string_converter::<i64>,
1933            encodings,
1934        );
1935
1936        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1937        for key in &small_key_types {
1938            for encoding in encodings {
1939                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1940                opts.encoding = *encoding;
1941
1942                let data_type =
1943                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1944
1945                // Cannot run full test suite as keys overflow, run small test instead
1946                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1947                    opts,
1948                    2,
1949                    ConvertedType::UTF8,
1950                    Some(data_type.clone()),
1951                    move |vals| {
1952                        let vals = string_converter::<i32>(vals);
1953                        arrow::compute::cast(&vals, &data_type).unwrap()
1954                    },
1955                );
1956            }
1957        }
1958
1959        let key_types = [
1960            ArrowDataType::Int16,
1961            ArrowDataType::UInt16,
1962            ArrowDataType::Int32,
1963            ArrowDataType::UInt32,
1964            ArrowDataType::Int64,
1965            ArrowDataType::UInt64,
1966        ];
1967
1968        for key in &key_types {
1969            let data_type =
1970                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1971
1972            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1973                2,
1974                ConvertedType::UTF8,
1975                Some(data_type.clone()),
1976                move |vals| {
1977                    let vals = string_converter::<i32>(vals);
1978                    arrow::compute::cast(&vals, &data_type).unwrap()
1979                },
1980                encodings,
1981            );
1982
1983            // https://github.com/apache/arrow-rs/issues/1179
1984            // let data_type = ArrowDataType::Dictionary(
1985            //     Box::new(key.clone()),
1986            //     Box::new(ArrowDataType::LargeUtf8),
1987            // );
1988            //
1989            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1990            //     2,
1991            //     ConvertedType::UTF8,
1992            //     Some(data_type.clone()),
1993            //     move |vals| {
1994            //         let vals = string_converter::<i64>(vals);
1995            //         arrow::compute::cast(&vals, &data_type).unwrap()
1996            //     },
1997            //     encodings,
1998            // );
1999        }
2000    }
2001
2002    #[test]
2003    fn test_decimal_nullable_struct() {
2004        let decimals = Decimal256Array::from_iter_values(
2005            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2006        );
2007
2008        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2009            "decimals",
2010            decimals.data_type().clone(),
2011            false,
2012        )])))
2013        .len(8)
2014        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2015        .child_data(vec![decimals.into_data()])
2016        .build()
2017        .unwrap();
2018
2019        let written =
2020            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2021                .unwrap();
2022
2023        let mut buffer = Vec::with_capacity(1024);
2024        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2025        writer.write(&written).unwrap();
2026        writer.close().unwrap();
2027
2028        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2029            .unwrap()
2030            .collect::<Result<Vec<_>, _>>()
2031            .unwrap();
2032
2033        assert_eq!(&written.slice(0, 3), &read[0]);
2034        assert_eq!(&written.slice(3, 3), &read[1]);
2035        assert_eq!(&written.slice(6, 2), &read[2]);
2036    }
2037
2038    #[test]
2039    fn test_int32_nullable_struct() {
2040        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2041        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2042            "int32",
2043            int32.data_type().clone(),
2044            false,
2045        )])))
2046        .len(8)
2047        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2048        .child_data(vec![int32.into_data()])
2049        .build()
2050        .unwrap();
2051
2052        let written =
2053            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2054                .unwrap();
2055
2056        let mut buffer = Vec::with_capacity(1024);
2057        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2058        writer.write(&written).unwrap();
2059        writer.close().unwrap();
2060
2061        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2062            .unwrap()
2063            .collect::<Result<Vec<_>, _>>()
2064            .unwrap();
2065
2066        assert_eq!(&written.slice(0, 3), &read[0]);
2067        assert_eq!(&written.slice(3, 3), &read[1]);
2068        assert_eq!(&written.slice(6, 2), &read[2]);
2069    }
2070
2071    #[test]
2072    fn test_decimal_list() {
2073        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2074
2075        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2076        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2077            decimals.data_type().clone(),
2078            false,
2079        ))))
2080        .len(7)
2081        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2082        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2083        .child_data(vec![decimals.into_data()])
2084        .build()
2085        .unwrap();
2086
2087        let written =
2088            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2089                .unwrap();
2090
2091        let mut buffer = Vec::with_capacity(1024);
2092        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2093        writer.write(&written).unwrap();
2094        writer.close().unwrap();
2095
2096        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2097            .unwrap()
2098            .collect::<Result<Vec<_>, _>>()
2099            .unwrap();
2100
2101        assert_eq!(&written.slice(0, 3), &read[0]);
2102        assert_eq!(&written.slice(3, 3), &read[1]);
2103        assert_eq!(&written.slice(6, 1), &read[2]);
2104    }
2105
2106    #[test]
2107    fn test_read_decimal_file() {
2108        use arrow_array::Decimal128Array;
2109        let testdata = arrow::util::test_util::parquet_test_data();
2110        let file_variants = vec![
2111            ("byte_array", 4),
2112            ("fixed_length", 25),
2113            ("int32", 4),
2114            ("int64", 10),
2115        ];
2116        for (prefix, target_precision) in file_variants {
2117            let path = format!("{testdata}/{prefix}_decimal.parquet");
2118            let file = File::open(path).unwrap();
2119            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2120
2121            let batch = record_reader.next().unwrap().unwrap();
2122            assert_eq!(batch.num_rows(), 24);
2123            let col = batch
2124                .column(0)
2125                .as_any()
2126                .downcast_ref::<Decimal128Array>()
2127                .unwrap();
2128
2129            let expected = 1..25;
2130
2131            assert_eq!(col.precision(), target_precision);
2132            assert_eq!(col.scale(), 2);
2133
2134            for (i, v) in expected.enumerate() {
2135                assert_eq!(col.value(i), v * 100_i128);
2136            }
2137        }
2138    }
2139
2140    #[test]
2141    fn test_read_float16_nonzeros_file() {
2142        use arrow_array::Float16Array;
2143        let testdata = arrow::util::test_util::parquet_test_data();
2144        // see https://github.com/apache/parquet-testing/pull/40
2145        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2146        let file = File::open(path).unwrap();
2147        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2148
2149        let batch = record_reader.next().unwrap().unwrap();
2150        assert_eq!(batch.num_rows(), 8);
2151        let col = batch
2152            .column(0)
2153            .as_any()
2154            .downcast_ref::<Float16Array>()
2155            .unwrap();
2156
2157        let f16_two = f16::ONE + f16::ONE;
2158
2159        assert_eq!(col.null_count(), 1);
2160        assert!(col.is_null(0));
2161        assert_eq!(col.value(1), f16::ONE);
2162        assert_eq!(col.value(2), -f16_two);
2163        assert!(col.value(3).is_nan());
2164        assert_eq!(col.value(4), f16::ZERO);
2165        assert!(col.value(4).is_sign_positive());
2166        assert_eq!(col.value(5), f16::NEG_ONE);
2167        assert_eq!(col.value(6), f16::NEG_ZERO);
2168        assert!(col.value(6).is_sign_negative());
2169        assert_eq!(col.value(7), f16_two);
2170    }
2171
2172    #[test]
2173    fn test_read_float16_zeros_file() {
2174        use arrow_array::Float16Array;
2175        let testdata = arrow::util::test_util::parquet_test_data();
2176        // see https://github.com/apache/parquet-testing/pull/40
2177        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2178        let file = File::open(path).unwrap();
2179        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2180
2181        let batch = record_reader.next().unwrap().unwrap();
2182        assert_eq!(batch.num_rows(), 3);
2183        let col = batch
2184            .column(0)
2185            .as_any()
2186            .downcast_ref::<Float16Array>()
2187            .unwrap();
2188
2189        assert_eq!(col.null_count(), 1);
2190        assert!(col.is_null(0));
2191        assert_eq!(col.value(1), f16::ZERO);
2192        assert!(col.value(1).is_sign_positive());
2193        assert!(col.value(2).is_nan());
2194    }
2195
2196    #[test]
2197    fn test_read_float32_float64_byte_stream_split() {
2198        let path = format!(
2199            "{}/byte_stream_split.zstd.parquet",
2200            arrow::util::test_util::parquet_test_data(),
2201        );
2202        let file = File::open(path).unwrap();
2203        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2204
2205        let mut row_count = 0;
2206        for batch in record_reader {
2207            let batch = batch.unwrap();
2208            row_count += batch.num_rows();
2209            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2210            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2211
2212            // This file contains floats from a standard normal distribution
2213            for &x in f32_col.values() {
2214                assert!(x > -10.0);
2215                assert!(x < 10.0);
2216            }
2217            for &x in f64_col.values() {
2218                assert!(x > -10.0);
2219                assert!(x < 10.0);
2220            }
2221        }
2222        assert_eq!(row_count, 300);
2223    }
2224
2225    #[test]
2226    fn test_read_extended_byte_stream_split() {
2227        let path = format!(
2228            "{}/byte_stream_split_extended.gzip.parquet",
2229            arrow::util::test_util::parquet_test_data(),
2230        );
2231        let file = File::open(path).unwrap();
2232        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2233
2234        let mut row_count = 0;
2235        for batch in record_reader {
2236            let batch = batch.unwrap();
2237            row_count += batch.num_rows();
2238
2239            // 0,1 are f16
2240            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2241            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2242            assert_eq!(f16_col.len(), f16_bss.len());
2243            f16_col
2244                .iter()
2245                .zip(f16_bss.iter())
2246                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2247
2248            // 2,3 are f32
2249            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2250            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2251            assert_eq!(f32_col.len(), f32_bss.len());
2252            f32_col
2253                .iter()
2254                .zip(f32_bss.iter())
2255                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2256
2257            // 4,5 are f64
2258            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2259            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2260            assert_eq!(f64_col.len(), f64_bss.len());
2261            f64_col
2262                .iter()
2263                .zip(f64_bss.iter())
2264                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2265
2266            // 6,7 are i32
2267            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2268            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2269            assert_eq!(i32_col.len(), i32_bss.len());
2270            i32_col
2271                .iter()
2272                .zip(i32_bss.iter())
2273                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2274
2275            // 8,9 are i64
2276            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2277            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2278            assert_eq!(i64_col.len(), i64_bss.len());
2279            i64_col
2280                .iter()
2281                .zip(i64_bss.iter())
2282                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2283
2284            // 10,11 are FLBA(5)
2285            let flba_col = batch.column(10).as_fixed_size_binary();
2286            let flba_bss = batch.column(11).as_fixed_size_binary();
2287            assert_eq!(flba_col.len(), flba_bss.len());
2288            flba_col
2289                .iter()
2290                .zip(flba_bss.iter())
2291                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2292
2293            // 12,13 are FLBA(4) (decimal(7,3))
2294            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2295            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2296            assert_eq!(dec_col.len(), dec_bss.len());
2297            dec_col
2298                .iter()
2299                .zip(dec_bss.iter())
2300                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2301        }
2302        assert_eq!(row_count, 200);
2303    }
2304
2305    #[test]
2306    fn test_read_incorrect_map_schema_file() {
2307        let testdata = arrow::util::test_util::parquet_test_data();
2308        // see https://github.com/apache/parquet-testing/pull/47
2309        let path = format!("{testdata}/incorrect_map_schema.parquet");
2310        let file = File::open(path).unwrap();
2311        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2312
2313        let batch = record_reader.next().unwrap().unwrap();
2314        assert_eq!(batch.num_rows(), 1);
2315
2316        let expected_schema = Schema::new(vec![Field::new(
2317            "my_map",
2318            ArrowDataType::Map(
2319                Arc::new(Field::new(
2320                    "key_value",
2321                    ArrowDataType::Struct(Fields::from(vec![
2322                        Field::new("key", ArrowDataType::Utf8, false),
2323                        Field::new("value", ArrowDataType::Utf8, true),
2324                    ])),
2325                    false,
2326                )),
2327                false,
2328            ),
2329            true,
2330        )]);
2331        assert_eq!(batch.schema().as_ref(), &expected_schema);
2332
2333        assert_eq!(batch.num_rows(), 1);
2334        assert_eq!(batch.column(0).null_count(), 0);
2335        assert_eq!(
2336            batch.column(0).as_map().keys().as_ref(),
2337            &StringArray::from(vec!["parent", "name"])
2338        );
2339        assert_eq!(
2340            batch.column(0).as_map().values().as_ref(),
2341            &StringArray::from(vec!["another", "report"])
2342        );
2343    }
2344
2345    #[test]
2346    fn test_read_dict_fixed_size_binary() {
2347        let schema = Arc::new(Schema::new(vec![Field::new(
2348            "a",
2349            ArrowDataType::Dictionary(
2350                Box::new(ArrowDataType::UInt8),
2351                Box::new(ArrowDataType::FixedSizeBinary(8)),
2352            ),
2353            true,
2354        )]));
2355        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2356        let values = FixedSizeBinaryArray::try_from_iter(
2357            vec![
2358                (0u8..8u8).collect::<Vec<u8>>(),
2359                (24u8..32u8).collect::<Vec<u8>>(),
2360            ]
2361            .into_iter(),
2362        )
2363        .unwrap();
2364        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2365        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2366
2367        let mut buffer = Vec::with_capacity(1024);
2368        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2369        writer.write(&batch).unwrap();
2370        writer.close().unwrap();
2371        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2372            .unwrap()
2373            .collect::<Result<Vec<_>, _>>()
2374            .unwrap();
2375
2376        assert_eq!(read.len(), 1);
2377        assert_eq!(&batch, &read[0])
2378    }
2379
2380    #[test]
2381    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2382        // the `StructArrayReader` will check the definition and repetition levels of the first
2383        // child column in the struct to determine nullability for the struct. If the first
2384        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2385        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2386        // buffers managed by this array reader.
2387
2388        let struct_fields = Fields::from(vec![
2389            Field::new(
2390                "city",
2391                ArrowDataType::Dictionary(
2392                    Box::new(ArrowDataType::UInt8),
2393                    Box::new(ArrowDataType::Utf8),
2394                ),
2395                true,
2396            ),
2397            Field::new("name", ArrowDataType::Utf8, true),
2398        ]);
2399        let schema = Arc::new(Schema::new(vec![Field::new(
2400            "items",
2401            ArrowDataType::Struct(struct_fields.clone()),
2402            true,
2403        )]));
2404
2405        let items_arr = StructArray::new(
2406            struct_fields,
2407            vec![
2408                Arc::new(DictionaryArray::new(
2409                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2410                    Arc::new(StringArray::from_iter_values(vec![
2411                        "quebec",
2412                        "fredericton",
2413                        "halifax",
2414                    ])),
2415                )),
2416                Arc::new(StringArray::from_iter_values(vec![
2417                    "albert", "terry", "lance", "", "tim",
2418                ])),
2419            ],
2420            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2421        );
2422
2423        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2424        let mut buffer = Vec::with_capacity(1024);
2425        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2426        writer.write(&batch).unwrap();
2427        writer.close().unwrap();
2428        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2429            .unwrap()
2430            .collect::<Result<Vec<_>, _>>()
2431            .unwrap();
2432
2433        assert_eq!(read.len(), 1);
2434        assert_eq!(&batch, &read[0])
2435    }
2436
2437    /// Parameters for single_column_reader_test
2438    #[derive(Clone)]
2439    struct TestOptions {
2440        /// Number of row group to write to parquet (row group size =
2441        /// num_row_groups / num_rows)
2442        num_row_groups: usize,
2443        /// Total number of rows per row group
2444        num_rows: usize,
2445        /// Size of batches to read back
2446        record_batch_size: usize,
2447        /// Percentage of nulls in column or None if required
2448        null_percent: Option<usize>,
2449        /// Set write batch size
2450        ///
2451        /// This is the number of rows that are written at once to a page and
2452        /// therefore acts as a bound on the page granularity of a row group
2453        write_batch_size: usize,
2454        /// Maximum size of page in bytes
2455        max_data_page_size: usize,
2456        /// Maximum size of dictionary page in bytes
2457        max_dict_page_size: usize,
2458        /// Writer version
2459        writer_version: WriterVersion,
2460        /// Enabled statistics
2461        enabled_statistics: EnabledStatistics,
2462        /// Encoding
2463        encoding: Encoding,
2464        /// row selections and total selected row count
2465        row_selections: Option<(RowSelection, usize)>,
2466        /// row filter
2467        row_filter: Option<Vec<bool>>,
2468        /// limit
2469        limit: Option<usize>,
2470        /// offset
2471        offset: Option<usize>,
2472    }
2473
2474    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2475    impl std::fmt::Debug for TestOptions {
2476        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2477            f.debug_struct("TestOptions")
2478                .field("num_row_groups", &self.num_row_groups)
2479                .field("num_rows", &self.num_rows)
2480                .field("record_batch_size", &self.record_batch_size)
2481                .field("null_percent", &self.null_percent)
2482                .field("write_batch_size", &self.write_batch_size)
2483                .field("max_data_page_size", &self.max_data_page_size)
2484                .field("max_dict_page_size", &self.max_dict_page_size)
2485                .field("writer_version", &self.writer_version)
2486                .field("enabled_statistics", &self.enabled_statistics)
2487                .field("encoding", &self.encoding)
2488                .field("row_selections", &self.row_selections.is_some())
2489                .field("row_filter", &self.row_filter.is_some())
2490                .field("limit", &self.limit)
2491                .field("offset", &self.offset)
2492                .finish()
2493        }
2494    }
2495
2496    impl Default for TestOptions {
2497        fn default() -> Self {
2498            Self {
2499                num_row_groups: 2,
2500                num_rows: 100,
2501                record_batch_size: 15,
2502                null_percent: None,
2503                write_batch_size: 64,
2504                max_data_page_size: 1024 * 1024,
2505                max_dict_page_size: 1024 * 1024,
2506                writer_version: WriterVersion::PARQUET_1_0,
2507                enabled_statistics: EnabledStatistics::Page,
2508                encoding: Encoding::PLAIN,
2509                row_selections: None,
2510                row_filter: None,
2511                limit: None,
2512                offset: None,
2513            }
2514        }
2515    }
2516
2517    impl TestOptions {
2518        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2519            Self {
2520                num_row_groups,
2521                num_rows,
2522                record_batch_size,
2523                ..Default::default()
2524            }
2525        }
2526
2527        fn with_null_percent(self, null_percent: usize) -> Self {
2528            Self {
2529                null_percent: Some(null_percent),
2530                ..self
2531            }
2532        }
2533
2534        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2535            Self {
2536                max_data_page_size,
2537                ..self
2538            }
2539        }
2540
2541        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2542            Self {
2543                max_dict_page_size,
2544                ..self
2545            }
2546        }
2547
2548        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2549            Self {
2550                enabled_statistics,
2551                ..self
2552            }
2553        }
2554
2555        fn with_row_selections(self) -> Self {
2556            assert!(self.row_filter.is_none(), "Must set row selection first");
2557
2558            let mut rng = rng();
2559            let step = rng.random_range(self.record_batch_size..self.num_rows);
2560            let row_selections = create_test_selection(
2561                step,
2562                self.num_row_groups * self.num_rows,
2563                rng.random::<bool>(),
2564            );
2565            Self {
2566                row_selections: Some(row_selections),
2567                ..self
2568            }
2569        }
2570
2571        fn with_row_filter(self) -> Self {
2572            let row_count = match &self.row_selections {
2573                Some((_, count)) => *count,
2574                None => self.num_row_groups * self.num_rows,
2575            };
2576
2577            let mut rng = rng();
2578            Self {
2579                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2580                ..self
2581            }
2582        }
2583
2584        fn with_limit(self, limit: usize) -> Self {
2585            Self {
2586                limit: Some(limit),
2587                ..self
2588            }
2589        }
2590
2591        fn with_offset(self, offset: usize) -> Self {
2592            Self {
2593                offset: Some(offset),
2594                ..self
2595            }
2596        }
2597
2598        fn writer_props(&self) -> WriterProperties {
2599            let builder = WriterProperties::builder()
2600                .set_data_page_size_limit(self.max_data_page_size)
2601                .set_write_batch_size(self.write_batch_size)
2602                .set_writer_version(self.writer_version)
2603                .set_statistics_enabled(self.enabled_statistics);
2604
2605            let builder = match self.encoding {
2606                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2607                    .set_dictionary_enabled(true)
2608                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2609                _ => builder
2610                    .set_dictionary_enabled(false)
2611                    .set_encoding(self.encoding),
2612            };
2613
2614            builder.build()
2615        }
2616    }
2617
2618    /// Create a parquet file and then read it using
2619    /// `ParquetFileArrowReader` using a standard set of parameters
2620    /// `opts`.
2621    ///
2622    /// `rand_max` represents the maximum size of value to pass to to
2623    /// value generator
2624    fn run_single_column_reader_tests<T, F, G>(
2625        rand_max: i32,
2626        converted_type: ConvertedType,
2627        arrow_type: Option<ArrowDataType>,
2628        converter: F,
2629        encodings: &[Encoding],
2630    ) where
2631        T: DataType,
2632        G: RandGen<T>,
2633        F: Fn(&[Option<T::T>]) -> ArrayRef,
2634    {
2635        let all_options = vec![
2636            // choose record_batch_batch (15) so batches cross row
2637            // group boundaries (50 rows in 2 row groups) cases.
2638            TestOptions::new(2, 100, 15),
2639            // choose record_batch_batch (5) so batches sometime fall
2640            // on row group boundaries and (25 rows in 3 row groups
2641            // --> row groups of 10, 10, and 5). Tests buffer
2642            // refilling edge cases.
2643            TestOptions::new(3, 25, 5),
2644            // Choose record_batch_size (25) so all batches fall
2645            // exactly on row group boundary (25). Tests buffer
2646            // refilling edge cases.
2647            TestOptions::new(4, 100, 25),
2648            // Set maximum page size so row groups have multiple pages
2649            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2650            // Set small dictionary page size to test dictionary fallback
2651            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2652            // Test optional but with no nulls
2653            TestOptions::new(2, 256, 127).with_null_percent(0),
2654            // Test optional with nulls
2655            TestOptions::new(2, 256, 93).with_null_percent(25),
2656            // Test with limit of 0
2657            TestOptions::new(4, 100, 25).with_limit(0),
2658            // Test with limit of 50
2659            TestOptions::new(4, 100, 25).with_limit(50),
2660            // Test with limit equal to number of rows
2661            TestOptions::new(4, 100, 25).with_limit(10),
2662            // Test with limit larger than number of rows
2663            TestOptions::new(4, 100, 25).with_limit(101),
2664            // Test with limit + offset equal to number of rows
2665            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2666            // Test with limit + offset equal to number of rows
2667            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2668            // Test with limit + offset larger than number of rows
2669            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2670            // Test with no page-level statistics
2671            TestOptions::new(2, 256, 91)
2672                .with_null_percent(25)
2673                .with_enabled_statistics(EnabledStatistics::Chunk),
2674            // Test with no statistics
2675            TestOptions::new(2, 256, 91)
2676                .with_null_percent(25)
2677                .with_enabled_statistics(EnabledStatistics::None),
2678            // Test with all null
2679            TestOptions::new(2, 128, 91)
2680                .with_null_percent(100)
2681                .with_enabled_statistics(EnabledStatistics::None),
2682            // Test skip
2683
2684            // choose record_batch_batch (15) so batches cross row
2685            // group boundaries (50 rows in 2 row groups) cases.
2686            TestOptions::new(2, 100, 15).with_row_selections(),
2687            // choose record_batch_batch (5) so batches sometime fall
2688            // on row group boundaries and (25 rows in 3 row groups
2689            // --> row groups of 10, 10, and 5). Tests buffer
2690            // refilling edge cases.
2691            TestOptions::new(3, 25, 5).with_row_selections(),
2692            // Choose record_batch_size (25) so all batches fall
2693            // exactly on row group boundary (25). Tests buffer
2694            // refilling edge cases.
2695            TestOptions::new(4, 100, 25).with_row_selections(),
2696            // Set maximum page size so row groups have multiple pages
2697            TestOptions::new(3, 256, 73)
2698                .with_max_data_page_size(128)
2699                .with_row_selections(),
2700            // Set small dictionary page size to test dictionary fallback
2701            TestOptions::new(3, 256, 57)
2702                .with_max_dict_page_size(128)
2703                .with_row_selections(),
2704            // Test optional but with no nulls
2705            TestOptions::new(2, 256, 127)
2706                .with_null_percent(0)
2707                .with_row_selections(),
2708            // Test optional with nulls
2709            TestOptions::new(2, 256, 93)
2710                .with_null_percent(25)
2711                .with_row_selections(),
2712            // Test optional with nulls
2713            TestOptions::new(2, 256, 93)
2714                .with_null_percent(25)
2715                .with_row_selections()
2716                .with_limit(10),
2717            // Test optional with nulls
2718            TestOptions::new(2, 256, 93)
2719                .with_null_percent(25)
2720                .with_row_selections()
2721                .with_offset(20)
2722                .with_limit(10),
2723            // Test filter
2724
2725            // Test with row filter
2726            TestOptions::new(4, 100, 25).with_row_filter(),
2727            // Test with row selection and row filter
2728            TestOptions::new(4, 100, 25)
2729                .with_row_selections()
2730                .with_row_filter(),
2731            // Test with nulls and row filter
2732            TestOptions::new(2, 256, 93)
2733                .with_null_percent(25)
2734                .with_max_data_page_size(10)
2735                .with_row_filter(),
2736            // Test with nulls and row filter and small pages
2737            TestOptions::new(2, 256, 93)
2738                .with_null_percent(25)
2739                .with_max_data_page_size(10)
2740                .with_row_selections()
2741                .with_row_filter(),
2742            // Test with row selection and no offset index and small pages
2743            TestOptions::new(2, 256, 93)
2744                .with_enabled_statistics(EnabledStatistics::None)
2745                .with_max_data_page_size(10)
2746                .with_row_selections(),
2747        ];
2748
2749        all_options.into_iter().for_each(|opts| {
2750            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2751                for encoding in encodings {
2752                    let opts = TestOptions {
2753                        writer_version,
2754                        encoding: *encoding,
2755                        ..opts.clone()
2756                    };
2757
2758                    single_column_reader_test::<T, _, G>(
2759                        opts,
2760                        rand_max,
2761                        converted_type,
2762                        arrow_type.clone(),
2763                        &converter,
2764                    )
2765                }
2766            }
2767        });
2768    }
2769
2770    /// Create a parquet file and then read it using
2771    /// `ParquetFileArrowReader` using the parameters described in
2772    /// `opts`.
2773    fn single_column_reader_test<T, F, G>(
2774        opts: TestOptions,
2775        rand_max: i32,
2776        converted_type: ConvertedType,
2777        arrow_type: Option<ArrowDataType>,
2778        converter: F,
2779    ) where
2780        T: DataType,
2781        G: RandGen<T>,
2782        F: Fn(&[Option<T::T>]) -> ArrayRef,
2783    {
2784        // Print out options to facilitate debugging failures on CI
2785        println!(
2786            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2787            T::get_physical_type(),
2788            converted_type,
2789            arrow_type,
2790            opts
2791        );
2792
2793        //according to null_percent generate def_levels
2794        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2795            Some(null_percent) => {
2796                let mut rng = rng();
2797
2798                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2799                    .map(|_| {
2800                        std::iter::from_fn(|| {
2801                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2802                        })
2803                        .take(opts.num_rows)
2804                        .collect()
2805                    })
2806                    .collect();
2807                (Repetition::OPTIONAL, Some(def_levels))
2808            }
2809            None => (Repetition::REQUIRED, None),
2810        };
2811
2812        //generate random table data
2813        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2814            .map(|idx| {
2815                let null_count = match def_levels.as_ref() {
2816                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2817                    None => 0,
2818                };
2819                G::gen_vec(rand_max, opts.num_rows - null_count)
2820            })
2821            .collect();
2822
2823        let len = match T::get_physical_type() {
2824            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2825            crate::basic::Type::INT96 => 12,
2826            _ => -1,
2827        };
2828
2829        let fields = vec![Arc::new(
2830            Type::primitive_type_builder("leaf", T::get_physical_type())
2831                .with_repetition(repetition)
2832                .with_converted_type(converted_type)
2833                .with_length(len)
2834                .build()
2835                .unwrap(),
2836        )];
2837
2838        let schema = Arc::new(
2839            Type::group_type_builder("test_schema")
2840                .with_fields(fields)
2841                .build()
2842                .unwrap(),
2843        );
2844
2845        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2846
2847        let mut file = tempfile::tempfile().unwrap();
2848
2849        generate_single_column_file_with_data::<T>(
2850            &values,
2851            def_levels.as_ref(),
2852            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2853            schema,
2854            arrow_field,
2855            &opts,
2856        )
2857        .unwrap();
2858
2859        file.rewind().unwrap();
2860
2861        let options = ArrowReaderOptions::new()
2862            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2863
2864        let mut builder =
2865            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2866
2867        let expected_data = match opts.row_selections {
2868            Some((selections, row_count)) => {
2869                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2870
2871                let mut skip_data: Vec<Option<T::T>> = vec![];
2872                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2873                for select in dequeue {
2874                    if select.skip {
2875                        without_skip_data.drain(0..select.row_count);
2876                    } else {
2877                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2878                    }
2879                }
2880                builder = builder.with_row_selection(selections);
2881
2882                assert_eq!(skip_data.len(), row_count);
2883                skip_data
2884            }
2885            None => {
2886                //get flatten table data
2887                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2888                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2889                expected_data
2890            }
2891        };
2892
2893        let mut expected_data = match opts.row_filter {
2894            Some(filter) => {
2895                let expected_data = expected_data
2896                    .into_iter()
2897                    .zip(filter.iter())
2898                    .filter_map(|(d, f)| f.then(|| d))
2899                    .collect();
2900
2901                let mut filter_offset = 0;
2902                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2903                    ProjectionMask::all(),
2904                    move |b| {
2905                        let array = BooleanArray::from_iter(
2906                            filter
2907                                .iter()
2908                                .skip(filter_offset)
2909                                .take(b.num_rows())
2910                                .map(|x| Some(*x)),
2911                        );
2912                        filter_offset += b.num_rows();
2913                        Ok(array)
2914                    },
2915                ))]);
2916
2917                builder = builder.with_row_filter(filter);
2918                expected_data
2919            }
2920            None => expected_data,
2921        };
2922
2923        if let Some(offset) = opts.offset {
2924            builder = builder.with_offset(offset);
2925            expected_data = expected_data.into_iter().skip(offset).collect();
2926        }
2927
2928        if let Some(limit) = opts.limit {
2929            builder = builder.with_limit(limit);
2930            expected_data = expected_data.into_iter().take(limit).collect();
2931        }
2932
2933        let mut record_reader = builder
2934            .with_batch_size(opts.record_batch_size)
2935            .build()
2936            .unwrap();
2937
2938        let mut total_read = 0;
2939        loop {
2940            let maybe_batch = record_reader.next();
2941            if total_read < expected_data.len() {
2942                let end = min(total_read + opts.record_batch_size, expected_data.len());
2943                let batch = maybe_batch.unwrap().unwrap();
2944                assert_eq!(end - total_read, batch.num_rows());
2945
2946                let a = converter(&expected_data[total_read..end]);
2947                let b = Arc::clone(batch.column(0));
2948
2949                assert_eq!(a.data_type(), b.data_type());
2950                assert_eq!(a.to_data(), b.to_data());
2951                assert_eq!(
2952                    a.as_any().type_id(),
2953                    b.as_any().type_id(),
2954                    "incorrect type ids"
2955                );
2956
2957                total_read = end;
2958            } else {
2959                assert!(maybe_batch.is_none());
2960                break;
2961            }
2962        }
2963    }
2964
2965    fn gen_expected_data<T: DataType>(
2966        def_levels: Option<&Vec<Vec<i16>>>,
2967        values: &[Vec<T::T>],
2968    ) -> Vec<Option<T::T>> {
2969        let data: Vec<Option<T::T>> = match def_levels {
2970            Some(levels) => {
2971                let mut values_iter = values.iter().flatten();
2972                levels
2973                    .iter()
2974                    .flatten()
2975                    .map(|d| match d {
2976                        1 => Some(values_iter.next().cloned().unwrap()),
2977                        0 => None,
2978                        _ => unreachable!(),
2979                    })
2980                    .collect()
2981            }
2982            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2983        };
2984        data
2985    }
2986
2987    fn generate_single_column_file_with_data<T: DataType>(
2988        values: &[Vec<T::T>],
2989        def_levels: Option<&Vec<Vec<i16>>>,
2990        file: File,
2991        schema: TypePtr,
2992        field: Option<Field>,
2993        opts: &TestOptions,
2994    ) -> Result<ParquetMetaData> {
2995        let mut writer_props = opts.writer_props();
2996        if let Some(field) = field {
2997            let arrow_schema = Schema::new(vec![field]);
2998            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2999        }
3000
3001        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3002
3003        for (idx, v) in values.iter().enumerate() {
3004            let def_levels = def_levels.map(|d| d[idx].as_slice());
3005            let mut row_group_writer = writer.next_row_group()?;
3006            {
3007                let mut column_writer = row_group_writer
3008                    .next_column()?
3009                    .expect("Column writer is none!");
3010
3011                column_writer
3012                    .typed::<T>()
3013                    .write_batch(v, def_levels, None)?;
3014
3015                column_writer.close()?;
3016            }
3017            row_group_writer.close()?;
3018        }
3019
3020        writer.close()
3021    }
3022
3023    fn get_test_file(file_name: &str) -> File {
3024        let mut path = PathBuf::new();
3025        path.push(arrow::util::test_util::arrow_test_data());
3026        path.push(file_name);
3027
3028        File::open(path.as_path()).expect("File not found!")
3029    }
3030
3031    #[test]
3032    fn test_read_structs() {
3033        // This particular test file has columns of struct types where there is
3034        // a column that has the same name as one of the struct fields
3035        // (see: ARROW-11452)
3036        let testdata = arrow::util::test_util::parquet_test_data();
3037        let path = format!("{testdata}/nested_structs.rust.parquet");
3038        let file = File::open(&path).unwrap();
3039        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3040
3041        for batch in record_batch_reader {
3042            batch.unwrap();
3043        }
3044
3045        let file = File::open(&path).unwrap();
3046        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3047
3048        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3049        let projected_reader = builder
3050            .with_projection(mask)
3051            .with_batch_size(60)
3052            .build()
3053            .unwrap();
3054
3055        let expected_schema = Schema::new(vec![
3056            Field::new(
3057                "roll_num",
3058                ArrowDataType::Struct(Fields::from(vec![Field::new(
3059                    "count",
3060                    ArrowDataType::UInt64,
3061                    false,
3062                )])),
3063                false,
3064            ),
3065            Field::new(
3066                "PC_CUR",
3067                ArrowDataType::Struct(Fields::from(vec![
3068                    Field::new("mean", ArrowDataType::Int64, false),
3069                    Field::new("sum", ArrowDataType::Int64, false),
3070                ])),
3071                false,
3072            ),
3073        ]);
3074
3075        // Tests for #1652 and #1654
3076        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3077
3078        for batch in projected_reader {
3079            let batch = batch.unwrap();
3080            assert_eq!(batch.schema().as_ref(), &expected_schema);
3081        }
3082    }
3083
3084    #[test]
3085    // same as test_read_structs but constructs projection mask via column names
3086    fn test_read_structs_by_name() {
3087        let testdata = arrow::util::test_util::parquet_test_data();
3088        let path = format!("{testdata}/nested_structs.rust.parquet");
3089        let file = File::open(&path).unwrap();
3090        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3091
3092        for batch in record_batch_reader {
3093            batch.unwrap();
3094        }
3095
3096        let file = File::open(&path).unwrap();
3097        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3098
3099        let mask = ProjectionMask::columns(
3100            builder.parquet_schema(),
3101            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3102        );
3103        let projected_reader = builder
3104            .with_projection(mask)
3105            .with_batch_size(60)
3106            .build()
3107            .unwrap();
3108
3109        let expected_schema = Schema::new(vec![
3110            Field::new(
3111                "roll_num",
3112                ArrowDataType::Struct(Fields::from(vec![Field::new(
3113                    "count",
3114                    ArrowDataType::UInt64,
3115                    false,
3116                )])),
3117                false,
3118            ),
3119            Field::new(
3120                "PC_CUR",
3121                ArrowDataType::Struct(Fields::from(vec![
3122                    Field::new("mean", ArrowDataType::Int64, false),
3123                    Field::new("sum", ArrowDataType::Int64, false),
3124                ])),
3125                false,
3126            ),
3127        ]);
3128
3129        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3130
3131        for batch in projected_reader {
3132            let batch = batch.unwrap();
3133            assert_eq!(batch.schema().as_ref(), &expected_schema);
3134        }
3135    }
3136
3137    #[test]
3138    fn test_read_maps() {
3139        let testdata = arrow::util::test_util::parquet_test_data();
3140        let path = format!("{testdata}/nested_maps.snappy.parquet");
3141        let file = File::open(path).unwrap();
3142        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3143
3144        for batch in record_batch_reader {
3145            batch.unwrap();
3146        }
3147    }
3148
3149    #[test]
3150    fn test_nested_nullability() {
3151        let message_type = "message nested {
3152          OPTIONAL Group group {
3153            REQUIRED INT32 leaf;
3154          }
3155        }";
3156
3157        let file = tempfile::tempfile().unwrap();
3158        let schema = Arc::new(parse_message_type(message_type).unwrap());
3159
3160        {
3161            // Write using low-level parquet API (#1167)
3162            let mut writer =
3163                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3164                    .unwrap();
3165
3166            {
3167                let mut row_group_writer = writer.next_row_group().unwrap();
3168                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3169
3170                column_writer
3171                    .typed::<Int32Type>()
3172                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3173                    .unwrap();
3174
3175                column_writer.close().unwrap();
3176                row_group_writer.close().unwrap();
3177            }
3178
3179            writer.close().unwrap();
3180        }
3181
3182        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3183        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3184
3185        let reader = builder.with_projection(mask).build().unwrap();
3186
3187        let expected_schema = Schema::new(vec![Field::new(
3188            "group",
3189            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3190            true,
3191        )]);
3192
3193        let batch = reader.into_iter().next().unwrap().unwrap();
3194        assert_eq!(batch.schema().as_ref(), &expected_schema);
3195        assert_eq!(batch.num_rows(), 4);
3196        assert_eq!(batch.column(0).null_count(), 2);
3197    }
3198
3199    #[test]
3200    fn test_invalid_utf8() {
3201        // a parquet file with 1 column with invalid utf8
3202        let data = vec![
3203            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3204            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3205            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3206            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3207            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3208            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3209            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3210            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3211            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3212            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3213            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3214            82, 49,
3215        ];
3216
3217        let file = Bytes::from(data);
3218        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3219
3220        let error = record_batch_reader.next().unwrap().unwrap_err();
3221
3222        assert!(
3223            error.to_string().contains("invalid utf-8 sequence"),
3224            "{}",
3225            error
3226        );
3227    }
3228
3229    #[test]
3230    fn test_invalid_utf8_string_array() {
3231        test_invalid_utf8_string_array_inner::<i32>();
3232    }
3233
3234    #[test]
3235    fn test_invalid_utf8_large_string_array() {
3236        test_invalid_utf8_string_array_inner::<i64>();
3237    }
3238
3239    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3240        let cases = [
3241            invalid_utf8_first_char::<O>(),
3242            invalid_utf8_first_char_long_strings::<O>(),
3243            invalid_utf8_later_char::<O>(),
3244            invalid_utf8_later_char_long_strings::<O>(),
3245            invalid_utf8_later_char_really_long_strings::<O>(),
3246            invalid_utf8_later_char_really_long_strings2::<O>(),
3247        ];
3248        for array in &cases {
3249            for encoding in STRING_ENCODINGS {
3250                // data is not valid utf8 we can not construct a correct StringArray
3251                // safely, so purposely create an invalid StringArray
3252                let array = unsafe {
3253                    GenericStringArray::<O>::new_unchecked(
3254                        array.offsets().clone(),
3255                        array.values().clone(),
3256                        array.nulls().cloned(),
3257                    )
3258                };
3259                let data_type = array.data_type().clone();
3260                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3261                let err = read_from_parquet(data).unwrap_err();
3262                let expected_err =
3263                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3264                assert!(
3265                    err.to_string().contains(expected_err),
3266                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3267                );
3268            }
3269        }
3270    }
3271
3272    #[test]
3273    fn test_invalid_utf8_string_view_array() {
3274        let cases = [
3275            invalid_utf8_first_char::<i32>(),
3276            invalid_utf8_first_char_long_strings::<i32>(),
3277            invalid_utf8_later_char::<i32>(),
3278            invalid_utf8_later_char_long_strings::<i32>(),
3279            invalid_utf8_later_char_really_long_strings::<i32>(),
3280            invalid_utf8_later_char_really_long_strings2::<i32>(),
3281        ];
3282
3283        for encoding in STRING_ENCODINGS {
3284            for array in &cases {
3285                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3286                let array = array.as_binary_view();
3287
3288                // data is not valid utf8 we can not construct a correct StringArray
3289                // safely, so purposely create an invalid StringViewArray
3290                let array = unsafe {
3291                    StringViewArray::new_unchecked(
3292                        array.views().clone(),
3293                        array.data_buffers().to_vec(),
3294                        array.nulls().cloned(),
3295                    )
3296                };
3297
3298                let data_type = array.data_type().clone();
3299                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3300                let err = read_from_parquet(data).unwrap_err();
3301                let expected_err =
3302                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3303                assert!(
3304                    err.to_string().contains(expected_err),
3305                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3306                );
3307            }
3308        }
3309    }
3310
3311    /// Encodings suitable for string data
3312    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3313        None,
3314        Some(Encoding::PLAIN),
3315        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3316        Some(Encoding::DELTA_BYTE_ARRAY),
3317    ];
3318
3319    /// Invalid Utf-8 sequence in the first character
3320    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3321    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3322
3323    /// Invalid Utf=8 sequence in NOT the first character
3324    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3325    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3326
3327    /// returns a BinaryArray with invalid UTF8 data in the first character
3328    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3329        let valid: &[u8] = b"   ";
3330        let invalid = INVALID_UTF8_FIRST_CHAR;
3331        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3332    }
3333
3334    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3335    /// string larger than 12 bytes which is handled specially when reading
3336    /// `ByteViewArray`s
3337    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3338        let valid: &[u8] = b"   ";
3339        let mut invalid = vec![];
3340        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3341        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3342        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3343    }
3344
3345    /// returns a BinaryArray with invalid UTF8 data in a character other than
3346    /// the first (this is checked in a special codepath)
3347    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3348        let valid: &[u8] = b"   ";
3349        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3350        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3351    }
3352
3353    /// returns a BinaryArray with invalid UTF8 data in a character other than
3354    /// the first in a string larger than 12 bytes which is handled specially
3355    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3356    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3357        let valid: &[u8] = b"   ";
3358        let mut invalid = vec![];
3359        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3360        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3361        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3362    }
3363
3364    /// returns a BinaryArray with invalid UTF8 data in a character other than
3365    /// the first in a string larger than 128 bytes which is handled specially
3366    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3367    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3368        let valid: &[u8] = b"   ";
3369        let mut invalid = vec![];
3370        for _ in 0..10 {
3371            // each instance is 38 bytes
3372            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3373        }
3374        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3375        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3376    }
3377
3378    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3379    /// invalid UTF8 data in a character other than the first in a string larger
3380    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3381        let valid: &[u8] = b"   ";
3382        let mut valid_long = vec![];
3383        for _ in 0..10 {
3384            // each instance is 38 bytes
3385            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3386        }
3387        let invalid = INVALID_UTF8_LATER_CHAR;
3388        GenericBinaryArray::<O>::from_iter(vec![
3389            None,
3390            Some(valid),
3391            Some(invalid),
3392            None,
3393            Some(&valid_long),
3394            Some(valid),
3395        ])
3396    }
3397
3398    /// writes the array into a single column parquet file with the specified
3399    /// encoding.
3400    ///
3401    /// If no encoding is specified, use default (dictionary) encoding
3402    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3403        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3404        let mut data = vec![];
3405        let schema = batch.schema();
3406        let props = encoding.map(|encoding| {
3407            WriterProperties::builder()
3408                // must disable dictionary encoding to actually use encoding
3409                .set_dictionary_enabled(false)
3410                .set_encoding(encoding)
3411                .build()
3412        });
3413
3414        {
3415            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3416            writer.write(&batch).unwrap();
3417            writer.flush().unwrap();
3418            writer.close().unwrap();
3419        };
3420        data
3421    }
3422
3423    /// read the parquet file into a record batch
3424    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3425        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3426            .unwrap()
3427            .build()
3428            .unwrap();
3429
3430        reader.collect()
3431    }
3432
3433    #[test]
3434    fn test_dictionary_preservation() {
3435        let fields = vec![Arc::new(
3436            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3437                .with_repetition(Repetition::OPTIONAL)
3438                .with_converted_type(ConvertedType::UTF8)
3439                .build()
3440                .unwrap(),
3441        )];
3442
3443        let schema = Arc::new(
3444            Type::group_type_builder("test_schema")
3445                .with_fields(fields)
3446                .build()
3447                .unwrap(),
3448        );
3449
3450        let dict_type = ArrowDataType::Dictionary(
3451            Box::new(ArrowDataType::Int32),
3452            Box::new(ArrowDataType::Utf8),
3453        );
3454
3455        let arrow_field = Field::new("leaf", dict_type, true);
3456
3457        let mut file = tempfile::tempfile().unwrap();
3458
3459        let values = vec![
3460            vec![
3461                ByteArray::from("hello"),
3462                ByteArray::from("a"),
3463                ByteArray::from("b"),
3464                ByteArray::from("d"),
3465            ],
3466            vec![
3467                ByteArray::from("c"),
3468                ByteArray::from("a"),
3469                ByteArray::from("b"),
3470            ],
3471        ];
3472
3473        let def_levels = vec![
3474            vec![1, 0, 0, 1, 0, 0, 1, 1],
3475            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3476        ];
3477
3478        let opts = TestOptions {
3479            encoding: Encoding::RLE_DICTIONARY,
3480            ..Default::default()
3481        };
3482
3483        generate_single_column_file_with_data::<ByteArrayType>(
3484            &values,
3485            Some(&def_levels),
3486            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3487            schema,
3488            Some(arrow_field),
3489            &opts,
3490        )
3491        .unwrap();
3492
3493        file.rewind().unwrap();
3494
3495        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3496
3497        let batches = record_reader
3498            .collect::<Result<Vec<RecordBatch>, _>>()
3499            .unwrap();
3500
3501        assert_eq!(batches.len(), 6);
3502        assert!(batches.iter().all(|x| x.num_columns() == 1));
3503
3504        let row_counts = batches
3505            .iter()
3506            .map(|x| (x.num_rows(), x.column(0).null_count()))
3507            .collect::<Vec<_>>();
3508
3509        assert_eq!(
3510            row_counts,
3511            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3512        );
3513
3514        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3515
3516        // First and second batch in same row group -> same dictionary
3517        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3518        // Third batch spans row group -> computed dictionary
3519        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3520        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3521        // Fourth, fifth and sixth from same row group -> same dictionary
3522        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3523        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3524    }
3525
3526    #[test]
3527    fn test_read_null_list() {
3528        let testdata = arrow::util::test_util::parquet_test_data();
3529        let path = format!("{testdata}/null_list.parquet");
3530        let file = File::open(path).unwrap();
3531        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3532
3533        let batch = record_batch_reader.next().unwrap().unwrap();
3534        assert_eq!(batch.num_rows(), 1);
3535        assert_eq!(batch.num_columns(), 1);
3536        assert_eq!(batch.column(0).len(), 1);
3537
3538        let list = batch
3539            .column(0)
3540            .as_any()
3541            .downcast_ref::<ListArray>()
3542            .unwrap();
3543        assert_eq!(list.len(), 1);
3544        assert!(list.is_valid(0));
3545
3546        let val = list.value(0);
3547        assert_eq!(val.len(), 0);
3548    }
3549
3550    #[test]
3551    fn test_null_schema_inference() {
3552        let testdata = arrow::util::test_util::parquet_test_data();
3553        let path = format!("{testdata}/null_list.parquet");
3554        let file = File::open(path).unwrap();
3555
3556        let arrow_field = Field::new(
3557            "emptylist",
3558            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3559            true,
3560        );
3561
3562        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3563        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3564        let schema = builder.schema();
3565        assert_eq!(schema.fields().len(), 1);
3566        assert_eq!(schema.field(0), &arrow_field);
3567    }
3568
3569    #[test]
3570    fn test_skip_metadata() {
3571        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3572        let field = Field::new("col", col.data_type().clone(), true);
3573
3574        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3575
3576        let metadata = [("key".to_string(), "value".to_string())]
3577            .into_iter()
3578            .collect();
3579
3580        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3581
3582        assert_ne!(schema_with_metadata, schema_without_metadata);
3583
3584        let batch =
3585            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3586
3587        let file = |version: WriterVersion| {
3588            let props = WriterProperties::builder()
3589                .set_writer_version(version)
3590                .build();
3591
3592            let file = tempfile().unwrap();
3593            let mut writer =
3594                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3595                    .unwrap();
3596            writer.write(&batch).unwrap();
3597            writer.close().unwrap();
3598            file
3599        };
3600
3601        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3602
3603        let v1_reader = file(WriterVersion::PARQUET_1_0);
3604        let v2_reader = file(WriterVersion::PARQUET_2_0);
3605
3606        let arrow_reader =
3607            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3608        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3609
3610        let reader =
3611            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3612                .unwrap()
3613                .build()
3614                .unwrap();
3615        assert_eq!(reader.schema(), schema_without_metadata);
3616
3617        let arrow_reader =
3618            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3619        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3620
3621        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3622            .unwrap()
3623            .build()
3624            .unwrap();
3625        assert_eq!(reader.schema(), schema_without_metadata);
3626    }
3627
3628    fn write_parquet_from_iter<I, F>(value: I) -> File
3629    where
3630        I: IntoIterator<Item = (F, ArrayRef)>,
3631        F: AsRef<str>,
3632    {
3633        let batch = RecordBatch::try_from_iter(value).unwrap();
3634        let file = tempfile().unwrap();
3635        let mut writer =
3636            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3637        writer.write(&batch).unwrap();
3638        writer.close().unwrap();
3639        file
3640    }
3641
3642    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3643    where
3644        I: IntoIterator<Item = (F, ArrayRef)>,
3645        F: AsRef<str>,
3646    {
3647        let file = write_parquet_from_iter(value);
3648        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3649        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3650            file.try_clone().unwrap(),
3651            options_with_schema,
3652        );
3653        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3654    }
3655
3656    #[test]
3657    fn test_schema_too_few_columns() {
3658        run_schema_test_with_error(
3659            vec![
3660                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3661                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3662            ],
3663            Arc::new(Schema::new(vec![Field::new(
3664                "int64",
3665                ArrowDataType::Int64,
3666                false,
3667            )])),
3668            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3669        );
3670    }
3671
3672    #[test]
3673    fn test_schema_too_many_columns() {
3674        run_schema_test_with_error(
3675            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3676            Arc::new(Schema::new(vec![
3677                Field::new("int64", ArrowDataType::Int64, false),
3678                Field::new("int32", ArrowDataType::Int32, false),
3679            ])),
3680            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3681        );
3682    }
3683
3684    #[test]
3685    fn test_schema_mismatched_column_names() {
3686        run_schema_test_with_error(
3687            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3688            Arc::new(Schema::new(vec![Field::new(
3689                "other",
3690                ArrowDataType::Int64,
3691                false,
3692            )])),
3693            "Arrow: incompatible arrow schema, expected field named int64 got other",
3694        );
3695    }
3696
3697    #[test]
3698    fn test_schema_incompatible_columns() {
3699        run_schema_test_with_error(
3700            vec![
3701                (
3702                    "col1_invalid",
3703                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3704                ),
3705                (
3706                    "col2_valid",
3707                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3708                ),
3709                (
3710                    "col3_invalid",
3711                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3712                ),
3713            ],
3714            Arc::new(Schema::new(vec![
3715                Field::new("col1_invalid", ArrowDataType::Int32, false),
3716                Field::new("col2_valid", ArrowDataType::Int32, false),
3717                Field::new("col3_invalid", ArrowDataType::Int32, false),
3718            ])),
3719            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3720        );
3721    }
3722
3723    #[test]
3724    fn test_one_incompatible_nested_column() {
3725        let nested_fields = Fields::from(vec![
3726            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3727            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3728        ]);
3729        let nested = StructArray::try_new(
3730            nested_fields,
3731            vec![
3732                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3733                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3734            ],
3735            None,
3736        )
3737        .expect("struct array");
3738        let supplied_nested_fields = Fields::from(vec![
3739            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3740            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3741        ]);
3742        run_schema_test_with_error(
3743            vec![
3744                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3745                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3746                ("nested", Arc::new(nested) as ArrayRef),
3747            ],
3748            Arc::new(Schema::new(vec![
3749                Field::new("col1", ArrowDataType::Int64, false),
3750                Field::new("col2", ArrowDataType::Int32, false),
3751                Field::new(
3752                    "nested",
3753                    ArrowDataType::Struct(supplied_nested_fields),
3754                    false,
3755                ),
3756            ])),
3757            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3758            requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3759            but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3760        );
3761    }
3762
3763    /// Return parquet data with a single column of utf8 strings
3764    fn utf8_parquet() -> Bytes {
3765        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3766        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3767        let props = None;
3768        // write parquet file with non nullable strings
3769        let mut parquet_data = vec![];
3770        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3771        writer.write(&batch).unwrap();
3772        writer.close().unwrap();
3773        Bytes::from(parquet_data)
3774    }
3775
3776    #[test]
3777    fn test_schema_error_bad_types() {
3778        // verify incompatible schemas error on read
3779        let parquet_data = utf8_parquet();
3780
3781        // Ask to read it back with an incompatible schema (int vs string)
3782        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3783            "column1",
3784            arrow::datatypes::DataType::Int32,
3785            false,
3786        )]));
3787
3788        // read it back out
3789        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3790        let err =
3791            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3792                .unwrap_err();
3793        assert_eq!(
3794            err.to_string(),
3795            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3796        )
3797    }
3798
3799    #[test]
3800    fn test_schema_error_bad_nullability() {
3801        // verify incompatible schemas error on read
3802        let parquet_data = utf8_parquet();
3803
3804        // Ask to read it back with an incompatible schema (nullability mismatch)
3805        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3806            "column1",
3807            arrow::datatypes::DataType::Utf8,
3808            true,
3809        )]));
3810
3811        // read it back out
3812        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3813        let err =
3814            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3815                .unwrap_err();
3816        assert_eq!(
3817            err.to_string(),
3818            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3819        )
3820    }
3821
3822    #[test]
3823    fn test_read_binary_as_utf8() {
3824        let file = write_parquet_from_iter(vec![
3825            (
3826                "binary_to_utf8",
3827                Arc::new(BinaryArray::from(vec![
3828                    b"one".as_ref(),
3829                    b"two".as_ref(),
3830                    b"three".as_ref(),
3831                ])) as ArrayRef,
3832            ),
3833            (
3834                "large_binary_to_large_utf8",
3835                Arc::new(LargeBinaryArray::from(vec![
3836                    b"one".as_ref(),
3837                    b"two".as_ref(),
3838                    b"three".as_ref(),
3839                ])) as ArrayRef,
3840            ),
3841            (
3842                "binary_view_to_utf8_view",
3843                Arc::new(BinaryViewArray::from(vec![
3844                    b"one".as_ref(),
3845                    b"two".as_ref(),
3846                    b"three".as_ref(),
3847                ])) as ArrayRef,
3848            ),
3849        ]);
3850        let supplied_fields = Fields::from(vec![
3851            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3852            Field::new(
3853                "large_binary_to_large_utf8",
3854                ArrowDataType::LargeUtf8,
3855                false,
3856            ),
3857            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3858        ]);
3859
3860        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3861        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3862            file.try_clone().unwrap(),
3863            options,
3864        )
3865        .expect("reader builder with schema")
3866        .build()
3867        .expect("reader with schema");
3868
3869        let batch = arrow_reader.next().unwrap().unwrap();
3870        assert_eq!(batch.num_columns(), 3);
3871        assert_eq!(batch.num_rows(), 3);
3872        assert_eq!(
3873            batch
3874                .column(0)
3875                .as_string::<i32>()
3876                .iter()
3877                .collect::<Vec<_>>(),
3878            vec![Some("one"), Some("two"), Some("three")]
3879        );
3880
3881        assert_eq!(
3882            batch
3883                .column(1)
3884                .as_string::<i64>()
3885                .iter()
3886                .collect::<Vec<_>>(),
3887            vec![Some("one"), Some("two"), Some("three")]
3888        );
3889
3890        assert_eq!(
3891            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3892            vec![Some("one"), Some("two"), Some("three")]
3893        );
3894    }
3895
3896    #[test]
3897    #[should_panic(expected = "Invalid UTF8 sequence at")]
3898    fn test_read_non_utf8_binary_as_utf8() {
3899        let file = write_parquet_from_iter(vec![(
3900            "non_utf8_binary",
3901            Arc::new(BinaryArray::from(vec![
3902                b"\xDE\x00\xFF".as_ref(),
3903                b"\xDE\x01\xAA".as_ref(),
3904                b"\xDE\x02\xFF".as_ref(),
3905            ])) as ArrayRef,
3906        )]);
3907        let supplied_fields = Fields::from(vec![Field::new(
3908            "non_utf8_binary",
3909            ArrowDataType::Utf8,
3910            false,
3911        )]);
3912
3913        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3914        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3915            file.try_clone().unwrap(),
3916            options,
3917        )
3918        .expect("reader builder with schema")
3919        .build()
3920        .expect("reader with schema");
3921        arrow_reader.next().unwrap().unwrap_err();
3922    }
3923
3924    #[test]
3925    fn test_with_schema() {
3926        let nested_fields = Fields::from(vec![
3927            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3928            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3929        ]);
3930
3931        let nested_arrays: Vec<ArrayRef> = vec![
3932            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3933            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3934        ];
3935
3936        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3937
3938        let file = write_parquet_from_iter(vec![
3939            (
3940                "int32_to_ts_second",
3941                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3942            ),
3943            (
3944                "date32_to_date64",
3945                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3946            ),
3947            ("nested", Arc::new(nested) as ArrayRef),
3948        ]);
3949
3950        let supplied_nested_fields = Fields::from(vec![
3951            Field::new(
3952                "utf8_to_dict",
3953                ArrowDataType::Dictionary(
3954                    Box::new(ArrowDataType::Int32),
3955                    Box::new(ArrowDataType::Utf8),
3956                ),
3957                false,
3958            ),
3959            Field::new(
3960                "int64_to_ts_nano",
3961                ArrowDataType::Timestamp(
3962                    arrow::datatypes::TimeUnit::Nanosecond,
3963                    Some("+10:00".into()),
3964                ),
3965                false,
3966            ),
3967        ]);
3968
3969        let supplied_schema = Arc::new(Schema::new(vec![
3970            Field::new(
3971                "int32_to_ts_second",
3972                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3973                false,
3974            ),
3975            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3976            Field::new(
3977                "nested",
3978                ArrowDataType::Struct(supplied_nested_fields),
3979                false,
3980            ),
3981        ]));
3982
3983        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3984        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3985            file.try_clone().unwrap(),
3986            options,
3987        )
3988        .expect("reader builder with schema")
3989        .build()
3990        .expect("reader with schema");
3991
3992        assert_eq!(arrow_reader.schema(), supplied_schema);
3993        let batch = arrow_reader.next().unwrap().unwrap();
3994        assert_eq!(batch.num_columns(), 3);
3995        assert_eq!(batch.num_rows(), 4);
3996        assert_eq!(
3997            batch
3998                .column(0)
3999                .as_any()
4000                .downcast_ref::<TimestampSecondArray>()
4001                .expect("downcast to timestamp second")
4002                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4003                .map(|v| v.to_string())
4004                .expect("value as datetime"),
4005            "1970-01-01 01:00:00 +01:00"
4006        );
4007        assert_eq!(
4008            batch
4009                .column(1)
4010                .as_any()
4011                .downcast_ref::<Date64Array>()
4012                .expect("downcast to date64")
4013                .value_as_date(0)
4014                .map(|v| v.to_string())
4015                .expect("value as date"),
4016            "1970-01-01"
4017        );
4018
4019        let nested = batch
4020            .column(2)
4021            .as_any()
4022            .downcast_ref::<StructArray>()
4023            .expect("downcast to struct");
4024
4025        let nested_dict = nested
4026            .column(0)
4027            .as_any()
4028            .downcast_ref::<Int32DictionaryArray>()
4029            .expect("downcast to dictionary");
4030
4031        assert_eq!(
4032            nested_dict
4033                .values()
4034                .as_any()
4035                .downcast_ref::<StringArray>()
4036                .expect("downcast to string")
4037                .iter()
4038                .collect::<Vec<_>>(),
4039            vec![Some("a"), Some("b")]
4040        );
4041
4042        assert_eq!(
4043            nested_dict.keys().iter().collect::<Vec<_>>(),
4044            vec![Some(0), Some(0), Some(0), Some(1)]
4045        );
4046
4047        assert_eq!(
4048            nested
4049                .column(1)
4050                .as_any()
4051                .downcast_ref::<TimestampNanosecondArray>()
4052                .expect("downcast to timestamp nanosecond")
4053                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4054                .map(|v| v.to_string())
4055                .expect("value as datetime"),
4056            "1970-01-01 10:00:00.000000001 +10:00"
4057        );
4058    }
4059
4060    #[test]
4061    fn test_empty_projection() {
4062        let testdata = arrow::util::test_util::parquet_test_data();
4063        let path = format!("{testdata}/alltypes_plain.parquet");
4064        let file = File::open(path).unwrap();
4065
4066        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4067        let file_metadata = builder.metadata().file_metadata();
4068        let expected_rows = file_metadata.num_rows() as usize;
4069
4070        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4071        let batch_reader = builder
4072            .with_projection(mask)
4073            .with_batch_size(2)
4074            .build()
4075            .unwrap();
4076
4077        let mut total_rows = 0;
4078        for maybe_batch in batch_reader {
4079            let batch = maybe_batch.unwrap();
4080            total_rows += batch.num_rows();
4081            assert_eq!(batch.num_columns(), 0);
4082            assert!(batch.num_rows() <= 2);
4083        }
4084
4085        assert_eq!(total_rows, expected_rows);
4086    }
4087
4088    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4089        let schema = Arc::new(Schema::new(vec![Field::new(
4090            "list",
4091            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4092            true,
4093        )]));
4094
4095        let mut buf = Vec::with_capacity(1024);
4096
4097        let mut writer = ArrowWriter::try_new(
4098            &mut buf,
4099            schema.clone(),
4100            Some(
4101                WriterProperties::builder()
4102                    .set_max_row_group_size(row_group_size)
4103                    .build(),
4104            ),
4105        )
4106        .unwrap();
4107        for _ in 0..2 {
4108            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4109            for _ in 0..(batch_size) {
4110                list_builder.append(true);
4111            }
4112            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4113                .unwrap();
4114            writer.write(&batch).unwrap();
4115        }
4116        writer.close().unwrap();
4117
4118        let mut record_reader =
4119            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4120        assert_eq!(
4121            batch_size,
4122            record_reader.next().unwrap().unwrap().num_rows()
4123        );
4124        assert_eq!(
4125            batch_size,
4126            record_reader.next().unwrap().unwrap().num_rows()
4127        );
4128    }
4129
4130    #[test]
4131    fn test_row_group_exact_multiple() {
4132        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4133        test_row_group_batch(8, 8);
4134        test_row_group_batch(10, 8);
4135        test_row_group_batch(8, 10);
4136        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4137        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4138        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4139        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4140        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4141    }
4142
4143    /// Given a RecordBatch containing all the column data, return the expected batches given
4144    /// a `batch_size` and `selection`
4145    fn get_expected_batches(
4146        column: &RecordBatch,
4147        selection: &RowSelection,
4148        batch_size: usize,
4149    ) -> Vec<RecordBatch> {
4150        let mut expected_batches = vec![];
4151
4152        let mut selection: VecDeque<_> = selection.clone().into();
4153        let mut row_offset = 0;
4154        let mut last_start = None;
4155        while row_offset < column.num_rows() && !selection.is_empty() {
4156            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4157            while batch_remaining > 0 && !selection.is_empty() {
4158                let (to_read, skip) = match selection.front_mut() {
4159                    Some(selection) if selection.row_count > batch_remaining => {
4160                        selection.row_count -= batch_remaining;
4161                        (batch_remaining, selection.skip)
4162                    }
4163                    Some(_) => {
4164                        let select = selection.pop_front().unwrap();
4165                        (select.row_count, select.skip)
4166                    }
4167                    None => break,
4168                };
4169
4170                batch_remaining -= to_read;
4171
4172                match skip {
4173                    true => {
4174                        if let Some(last_start) = last_start.take() {
4175                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4176                        }
4177                        row_offset += to_read
4178                    }
4179                    false => {
4180                        last_start.get_or_insert(row_offset);
4181                        row_offset += to_read
4182                    }
4183                }
4184            }
4185        }
4186
4187        if let Some(last_start) = last_start.take() {
4188            expected_batches.push(column.slice(last_start, row_offset - last_start))
4189        }
4190
4191        // Sanity check, all batches except the final should be the batch size
4192        for batch in &expected_batches[..expected_batches.len() - 1] {
4193            assert_eq!(batch.num_rows(), batch_size);
4194        }
4195
4196        expected_batches
4197    }
4198
4199    fn create_test_selection(
4200        step_len: usize,
4201        total_len: usize,
4202        skip_first: bool,
4203    ) -> (RowSelection, usize) {
4204        let mut remaining = total_len;
4205        let mut skip = skip_first;
4206        let mut vec = vec![];
4207        let mut selected_count = 0;
4208        while remaining != 0 {
4209            let step = if remaining > step_len {
4210                step_len
4211            } else {
4212                remaining
4213            };
4214            vec.push(RowSelector {
4215                row_count: step,
4216                skip,
4217            });
4218            remaining -= step;
4219            if !skip {
4220                selected_count += step;
4221            }
4222            skip = !skip;
4223        }
4224        (vec.into(), selected_count)
4225    }
4226
4227    #[test]
4228    fn test_scan_row_with_selection() {
4229        let testdata = arrow::util::test_util::parquet_test_data();
4230        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4231        let test_file = File::open(&path).unwrap();
4232
4233        let mut serial_reader =
4234            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4235        let data = serial_reader.next().unwrap().unwrap();
4236
4237        let do_test = |batch_size: usize, selection_len: usize| {
4238            for skip_first in [false, true] {
4239                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4240
4241                let expected = get_expected_batches(&data, &selections, batch_size);
4242                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4243                assert_eq!(
4244                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4245                    expected,
4246                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4247                );
4248            }
4249        };
4250
4251        // total row count 7300
4252        // 1. test selection len more than one page row count
4253        do_test(1000, 1000);
4254
4255        // 2. test selection len less than one page row count
4256        do_test(20, 20);
4257
4258        // 3. test selection_len less than batch_size
4259        do_test(20, 5);
4260
4261        // 4. test selection_len more than batch_size
4262        // If batch_size < selection_len
4263        do_test(20, 5);
4264
4265        fn create_skip_reader(
4266            test_file: &File,
4267            batch_size: usize,
4268            selections: RowSelection,
4269        ) -> ParquetRecordBatchReader {
4270            let options = ArrowReaderOptions::new().with_page_index(true);
4271            let file = test_file.try_clone().unwrap();
4272            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4273                .unwrap()
4274                .with_batch_size(batch_size)
4275                .with_row_selection(selections)
4276                .build()
4277                .unwrap()
4278        }
4279    }
4280
4281    #[test]
4282    fn test_batch_size_overallocate() {
4283        let testdata = arrow::util::test_util::parquet_test_data();
4284        // `alltypes_plain.parquet` only have 8 rows
4285        let path = format!("{testdata}/alltypes_plain.parquet");
4286        let test_file = File::open(path).unwrap();
4287
4288        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4289        let num_rows = builder.metadata.file_metadata().num_rows();
4290        let reader = builder
4291            .with_batch_size(1024)
4292            .with_projection(ProjectionMask::all())
4293            .build()
4294            .unwrap();
4295        assert_ne!(1024, num_rows);
4296        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4297    }
4298
4299    #[test]
4300    fn test_read_with_page_index_enabled() {
4301        let testdata = arrow::util::test_util::parquet_test_data();
4302
4303        {
4304            // `alltypes_tiny_pages.parquet` has page index
4305            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4306            let test_file = File::open(path).unwrap();
4307            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4308                test_file,
4309                ArrowReaderOptions::new().with_page_index(true),
4310            )
4311            .unwrap();
4312            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4313            let reader = builder.build().unwrap();
4314            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4315            assert_eq!(batches.len(), 8);
4316        }
4317
4318        {
4319            // `alltypes_plain.parquet` doesn't have page index
4320            let path = format!("{testdata}/alltypes_plain.parquet");
4321            let test_file = File::open(path).unwrap();
4322            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4323                test_file,
4324                ArrowReaderOptions::new().with_page_index(true),
4325            )
4326            .unwrap();
4327            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4328            // we should read the file successfully.
4329            assert!(builder.metadata().offset_index().is_none());
4330            let reader = builder.build().unwrap();
4331            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4332            assert_eq!(batches.len(), 1);
4333        }
4334    }
4335
4336    #[test]
4337    fn test_raw_repetition() {
4338        const MESSAGE_TYPE: &str = "
4339            message Log {
4340              OPTIONAL INT32 eventType;
4341              REPEATED INT32 category;
4342              REPEATED group filter {
4343                OPTIONAL INT32 error;
4344              }
4345            }
4346        ";
4347        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4348        let props = Default::default();
4349
4350        let mut buf = Vec::with_capacity(1024);
4351        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4352        let mut row_group_writer = writer.next_row_group().unwrap();
4353
4354        // column 0
4355        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4356        col_writer
4357            .typed::<Int32Type>()
4358            .write_batch(&[1], Some(&[1]), None)
4359            .unwrap();
4360        col_writer.close().unwrap();
4361        // column 1
4362        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4363        col_writer
4364            .typed::<Int32Type>()
4365            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4366            .unwrap();
4367        col_writer.close().unwrap();
4368        // column 2
4369        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4370        col_writer
4371            .typed::<Int32Type>()
4372            .write_batch(&[1], Some(&[1]), Some(&[0]))
4373            .unwrap();
4374        col_writer.close().unwrap();
4375
4376        let rg_md = row_group_writer.close().unwrap();
4377        assert_eq!(rg_md.num_rows(), 1);
4378        writer.close().unwrap();
4379
4380        let bytes = Bytes::from(buf);
4381
4382        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4383        let full = no_mask.next().unwrap().unwrap();
4384
4385        assert_eq!(full.num_columns(), 3);
4386
4387        for idx in 0..3 {
4388            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4389            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4390            let mut reader = b.with_projection(mask).build().unwrap();
4391            let projected = reader.next().unwrap().unwrap();
4392
4393            assert_eq!(projected.num_columns(), 1);
4394            assert_eq!(full.column(idx), projected.column(0));
4395        }
4396    }
4397
4398    #[test]
4399    fn test_read_lz4_raw() {
4400        let testdata = arrow::util::test_util::parquet_test_data();
4401        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4402        let file = File::open(path).unwrap();
4403
4404        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4405            .unwrap()
4406            .collect::<Result<Vec<_>, _>>()
4407            .unwrap();
4408        assert_eq!(batches.len(), 1);
4409        let batch = &batches[0];
4410
4411        assert_eq!(batch.num_columns(), 3);
4412        assert_eq!(batch.num_rows(), 4);
4413
4414        // https://github.com/apache/parquet-testing/pull/18
4415        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4416        assert_eq!(
4417            a.values(),
4418            &[1593604800, 1593604800, 1593604801, 1593604801]
4419        );
4420
4421        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4422        let a: Vec<_> = a.iter().flatten().collect();
4423        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4424
4425        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4426        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4427    }
4428
4429    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4430    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4431    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4432    //    LZ4_HADOOP algorithm for compression.
4433    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4434    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4435    //    older parquet-cpp versions.
4436    //
4437    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4438    #[test]
4439    fn test_read_lz4_hadoop_fallback() {
4440        for file in [
4441            "hadoop_lz4_compressed.parquet",
4442            "non_hadoop_lz4_compressed.parquet",
4443        ] {
4444            let testdata = arrow::util::test_util::parquet_test_data();
4445            let path = format!("{testdata}/{file}");
4446            let file = File::open(path).unwrap();
4447            let expected_rows = 4;
4448
4449            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4450                .unwrap()
4451                .collect::<Result<Vec<_>, _>>()
4452                .unwrap();
4453            assert_eq!(batches.len(), 1);
4454            let batch = &batches[0];
4455
4456            assert_eq!(batch.num_columns(), 3);
4457            assert_eq!(batch.num_rows(), expected_rows);
4458
4459            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4460            assert_eq!(
4461                a.values(),
4462                &[1593604800, 1593604800, 1593604801, 1593604801]
4463            );
4464
4465            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4466            let b: Vec<_> = b.iter().flatten().collect();
4467            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4468
4469            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4470            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4471        }
4472    }
4473
4474    #[test]
4475    fn test_read_lz4_hadoop_large() {
4476        let testdata = arrow::util::test_util::parquet_test_data();
4477        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4478        let file = File::open(path).unwrap();
4479        let expected_rows = 10000;
4480
4481        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4482            .unwrap()
4483            .collect::<Result<Vec<_>, _>>()
4484            .unwrap();
4485        assert_eq!(batches.len(), 1);
4486        let batch = &batches[0];
4487
4488        assert_eq!(batch.num_columns(), 1);
4489        assert_eq!(batch.num_rows(), expected_rows);
4490
4491        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4492        let a: Vec<_> = a.iter().flatten().collect();
4493        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4494        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4495        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4496        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4497    }
4498
4499    #[test]
4500    #[cfg(feature = "snap")]
4501    fn test_read_nested_lists() {
4502        let testdata = arrow::util::test_util::parquet_test_data();
4503        let path = format!("{testdata}/nested_lists.snappy.parquet");
4504        let file = File::open(path).unwrap();
4505
4506        let f = file.try_clone().unwrap();
4507        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4508        let expected = reader.next().unwrap().unwrap();
4509        assert_eq!(expected.num_rows(), 3);
4510
4511        let selection = RowSelection::from(vec![
4512            RowSelector::skip(1),
4513            RowSelector::select(1),
4514            RowSelector::skip(1),
4515        ]);
4516        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4517            .unwrap()
4518            .with_row_selection(selection)
4519            .build()
4520            .unwrap();
4521
4522        let actual = reader.next().unwrap().unwrap();
4523        assert_eq!(actual.num_rows(), 1);
4524        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4525    }
4526
4527    #[test]
4528    fn test_arbitrary_decimal() {
4529        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4530        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4531            .with_precision_and_scale(19, 0)
4532            .unwrap();
4533        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4534            .with_precision_and_scale(12, 0)
4535            .unwrap();
4536        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4537            .with_precision_and_scale(17, 10)
4538            .unwrap();
4539
4540        let written = RecordBatch::try_from_iter([
4541            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4542            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4543            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4544        ])
4545        .unwrap();
4546
4547        let mut buffer = Vec::with_capacity(1024);
4548        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4549        writer.write(&written).unwrap();
4550        writer.close().unwrap();
4551
4552        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4553            .unwrap()
4554            .collect::<Result<Vec<_>, _>>()
4555            .unwrap();
4556
4557        assert_eq!(&written.slice(0, 8), &read[0]);
4558    }
4559
4560    #[test]
4561    fn test_list_skip() {
4562        let mut list = ListBuilder::new(Int32Builder::new());
4563        list.append_value([Some(1), Some(2)]);
4564        list.append_value([Some(3)]);
4565        list.append_value([Some(4)]);
4566        let list = list.finish();
4567        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4568
4569        // First page contains 2 values but only 1 row
4570        let props = WriterProperties::builder()
4571            .set_data_page_row_count_limit(1)
4572            .set_write_batch_size(2)
4573            .build();
4574
4575        let mut buffer = Vec::with_capacity(1024);
4576        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4577        writer.write(&batch).unwrap();
4578        writer.close().unwrap();
4579
4580        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4581        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4582            .unwrap()
4583            .with_row_selection(selection.into())
4584            .build()
4585            .unwrap();
4586        let out = reader.next().unwrap().unwrap();
4587        assert_eq!(out.num_rows(), 1);
4588        assert_eq!(out, batch.slice(2, 1));
4589    }
4590
4591    fn test_decimal32_roundtrip() {
4592        let d = |values: Vec<i32>, p: u8| {
4593            let iter = values.into_iter();
4594            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4595                .with_precision_and_scale(p, 2)
4596                .unwrap()
4597        };
4598
4599        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4600        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4601
4602        let mut buffer = Vec::with_capacity(1024);
4603        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4604        writer.write(&batch).unwrap();
4605        writer.close().unwrap();
4606
4607        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4608        let t1 = builder.parquet_schema().columns()[0].physical_type();
4609        assert_eq!(t1, PhysicalType::INT32);
4610
4611        let mut reader = builder.build().unwrap();
4612        assert_eq!(batch.schema(), reader.schema());
4613
4614        let out = reader.next().unwrap().unwrap();
4615        assert_eq!(batch, out);
4616    }
4617
4618    fn test_decimal64_roundtrip() {
4619        // Precision <= 9 -> INT32
4620        // Precision <= 18 -> INT64
4621
4622        let d = |values: Vec<i64>, p: u8| {
4623            let iter = values.into_iter();
4624            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4625                .with_precision_and_scale(p, 2)
4626                .unwrap()
4627        };
4628
4629        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4630        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4631        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4632
4633        let batch = RecordBatch::try_from_iter([
4634            ("d1", Arc::new(d1) as ArrayRef),
4635            ("d2", Arc::new(d2) as ArrayRef),
4636            ("d3", Arc::new(d3) as ArrayRef),
4637        ])
4638        .unwrap();
4639
4640        let mut buffer = Vec::with_capacity(1024);
4641        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4642        writer.write(&batch).unwrap();
4643        writer.close().unwrap();
4644
4645        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4646        let t1 = builder.parquet_schema().columns()[0].physical_type();
4647        assert_eq!(t1, PhysicalType::INT32);
4648        let t2 = builder.parquet_schema().columns()[1].physical_type();
4649        assert_eq!(t2, PhysicalType::INT64);
4650        let t3 = builder.parquet_schema().columns()[2].physical_type();
4651        assert_eq!(t3, PhysicalType::INT64);
4652
4653        let mut reader = builder.build().unwrap();
4654        assert_eq!(batch.schema(), reader.schema());
4655
4656        let out = reader.next().unwrap().unwrap();
4657        assert_eq!(batch, out);
4658    }
4659
4660    fn test_decimal_roundtrip<T: DecimalType>() {
4661        // Precision <= 9 -> INT32
4662        // Precision <= 18 -> INT64
4663        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4664
4665        let d = |values: Vec<usize>, p: u8| {
4666            let iter = values.into_iter().map(T::Native::usize_as);
4667            PrimitiveArray::<T>::from_iter_values(iter)
4668                .with_precision_and_scale(p, 2)
4669                .unwrap()
4670        };
4671
4672        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4673        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4674        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4675        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4676
4677        let batch = RecordBatch::try_from_iter([
4678            ("d1", Arc::new(d1) as ArrayRef),
4679            ("d2", Arc::new(d2) as ArrayRef),
4680            ("d3", Arc::new(d3) as ArrayRef),
4681            ("d4", Arc::new(d4) as ArrayRef),
4682        ])
4683        .unwrap();
4684
4685        let mut buffer = Vec::with_capacity(1024);
4686        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4687        writer.write(&batch).unwrap();
4688        writer.close().unwrap();
4689
4690        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4691        let t1 = builder.parquet_schema().columns()[0].physical_type();
4692        assert_eq!(t1, PhysicalType::INT32);
4693        let t2 = builder.parquet_schema().columns()[1].physical_type();
4694        assert_eq!(t2, PhysicalType::INT64);
4695        let t3 = builder.parquet_schema().columns()[2].physical_type();
4696        assert_eq!(t3, PhysicalType::INT64);
4697        let t4 = builder.parquet_schema().columns()[3].physical_type();
4698        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4699
4700        let mut reader = builder.build().unwrap();
4701        assert_eq!(batch.schema(), reader.schema());
4702
4703        let out = reader.next().unwrap().unwrap();
4704        assert_eq!(batch, out);
4705    }
4706
4707    #[test]
4708    fn test_decimal() {
4709        test_decimal32_roundtrip();
4710        test_decimal64_roundtrip();
4711        test_decimal_roundtrip::<Decimal128Type>();
4712        test_decimal_roundtrip::<Decimal256Type>();
4713    }
4714
4715    #[test]
4716    fn test_list_selection() {
4717        let schema = Arc::new(Schema::new(vec![Field::new_list(
4718            "list",
4719            Field::new_list_field(ArrowDataType::Utf8, true),
4720            false,
4721        )]));
4722        let mut buf = Vec::with_capacity(1024);
4723
4724        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4725
4726        for i in 0..2 {
4727            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4728            for j in 0..1024 {
4729                list_a_builder.values().append_value(format!("{i} {j}"));
4730                list_a_builder.append(true);
4731            }
4732            let batch =
4733                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4734                    .unwrap();
4735            writer.write(&batch).unwrap();
4736        }
4737        let _metadata = writer.close().unwrap();
4738
4739        let buf = Bytes::from(buf);
4740        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4741            .unwrap()
4742            .with_row_selection(RowSelection::from(vec![
4743                RowSelector::skip(100),
4744                RowSelector::select(924),
4745                RowSelector::skip(100),
4746                RowSelector::select(924),
4747            ]))
4748            .build()
4749            .unwrap();
4750
4751        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4752        let batch = concat_batches(&schema, &batches).unwrap();
4753
4754        assert_eq!(batch.num_rows(), 924 * 2);
4755        let list = batch.column(0).as_list::<i32>();
4756
4757        for w in list.value_offsets().windows(2) {
4758            assert_eq!(w[0] + 1, w[1])
4759        }
4760        let mut values = list.values().as_string::<i32>().iter();
4761
4762        for i in 0..2 {
4763            for j in 100..1024 {
4764                let expected = format!("{i} {j}");
4765                assert_eq!(values.next().unwrap().unwrap(), &expected);
4766            }
4767        }
4768    }
4769
4770    #[test]
4771    fn test_list_selection_fuzz() {
4772        let mut rng = rng();
4773        let schema = Arc::new(Schema::new(vec![Field::new_list(
4774            "list",
4775            Field::new_list(
4776                Field::LIST_FIELD_DEFAULT_NAME,
4777                Field::new_list_field(ArrowDataType::Int32, true),
4778                true,
4779            ),
4780            true,
4781        )]));
4782        let mut buf = Vec::with_capacity(1024);
4783        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4784
4785        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4786
4787        for _ in 0..2048 {
4788            if rng.random_bool(0.2) {
4789                list_a_builder.append(false);
4790                continue;
4791            }
4792
4793            let list_a_len = rng.random_range(0..10);
4794            let list_b_builder = list_a_builder.values();
4795
4796            for _ in 0..list_a_len {
4797                if rng.random_bool(0.2) {
4798                    list_b_builder.append(false);
4799                    continue;
4800                }
4801
4802                let list_b_len = rng.random_range(0..10);
4803                let int_builder = list_b_builder.values();
4804                for _ in 0..list_b_len {
4805                    match rng.random_bool(0.2) {
4806                        true => int_builder.append_null(),
4807                        false => int_builder.append_value(rng.random()),
4808                    }
4809                }
4810                list_b_builder.append(true)
4811            }
4812            list_a_builder.append(true);
4813        }
4814
4815        let array = Arc::new(list_a_builder.finish());
4816        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4817
4818        writer.write(&batch).unwrap();
4819        let _metadata = writer.close().unwrap();
4820
4821        let buf = Bytes::from(buf);
4822
4823        let cases = [
4824            vec![
4825                RowSelector::skip(100),
4826                RowSelector::select(924),
4827                RowSelector::skip(100),
4828                RowSelector::select(924),
4829            ],
4830            vec![
4831                RowSelector::select(924),
4832                RowSelector::skip(100),
4833                RowSelector::select(924),
4834                RowSelector::skip(100),
4835            ],
4836            vec![
4837                RowSelector::skip(1023),
4838                RowSelector::select(1),
4839                RowSelector::skip(1023),
4840                RowSelector::select(1),
4841            ],
4842            vec![
4843                RowSelector::select(1),
4844                RowSelector::skip(1023),
4845                RowSelector::select(1),
4846                RowSelector::skip(1023),
4847            ],
4848        ];
4849
4850        for batch_size in [100, 1024, 2048] {
4851            for selection in &cases {
4852                let selection = RowSelection::from(selection.clone());
4853                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4854                    .unwrap()
4855                    .with_row_selection(selection.clone())
4856                    .with_batch_size(batch_size)
4857                    .build()
4858                    .unwrap();
4859
4860                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4861                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4862                assert_eq!(actual.num_rows(), selection.row_count());
4863
4864                let mut batch_offset = 0;
4865                let mut actual_offset = 0;
4866                for selector in selection.iter() {
4867                    if selector.skip {
4868                        batch_offset += selector.row_count;
4869                        continue;
4870                    }
4871
4872                    assert_eq!(
4873                        batch.slice(batch_offset, selector.row_count),
4874                        actual.slice(actual_offset, selector.row_count)
4875                    );
4876
4877                    batch_offset += selector.row_count;
4878                    actual_offset += selector.row_count;
4879                }
4880            }
4881        }
4882    }
4883
4884    #[test]
4885    fn test_read_old_nested_list() {
4886        use arrow::datatypes::DataType;
4887        use arrow::datatypes::ToByteSlice;
4888
4889        let testdata = arrow::util::test_util::parquet_test_data();
4890        // message my_record {
4891        //     REQUIRED group a (LIST) {
4892        //         REPEATED group array (LIST) {
4893        //             REPEATED INT32 array;
4894        //         }
4895        //     }
4896        // }
4897        // should be read as list<list<int32>>
4898        let path = format!("{testdata}/old_list_structure.parquet");
4899        let test_file = File::open(path).unwrap();
4900
4901        // create expected ListArray
4902        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4903
4904        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4905        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4906
4907        // Construct a list array from the above two
4908        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4909            "array",
4910            DataType::Int32,
4911            false,
4912        ))))
4913        .len(2)
4914        .add_buffer(a_value_offsets)
4915        .add_child_data(a_values.into_data())
4916        .build()
4917        .unwrap();
4918        let a = ListArray::from(a_list_data);
4919
4920        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4921        let mut reader = builder.build().unwrap();
4922        let out = reader.next().unwrap().unwrap();
4923        assert_eq!(out.num_rows(), 1);
4924        assert_eq!(out.num_columns(), 1);
4925        // grab first column
4926        let c0 = out.column(0);
4927        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4928        // get first row: [[1, 2], [3, 4]]
4929        let r0 = c0arr.value(0);
4930        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4931        assert_eq!(r0arr, &a);
4932    }
4933
4934    #[test]
4935    fn test_map_no_value() {
4936        // File schema:
4937        // message schema {
4938        //   required group my_map (MAP) {
4939        //     repeated group key_value {
4940        //       required int32 key;
4941        //       optional int32 value;
4942        //     }
4943        //   }
4944        //   required group my_map_no_v (MAP) {
4945        //     repeated group key_value {
4946        //       required int32 key;
4947        //     }
4948        //   }
4949        //   required group my_list (LIST) {
4950        //     repeated group list {
4951        //       required int32 element;
4952        //     }
4953        //   }
4954        // }
4955        let testdata = arrow::util::test_util::parquet_test_data();
4956        let path = format!("{testdata}/map_no_value.parquet");
4957        let file = File::open(path).unwrap();
4958
4959        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4960            .unwrap()
4961            .build()
4962            .unwrap();
4963        let out = reader.next().unwrap().unwrap();
4964        assert_eq!(out.num_rows(), 3);
4965        assert_eq!(out.num_columns(), 3);
4966        // my_map_no_v and my_list columns should now be equivalent
4967        let c0 = out.column(1).as_list::<i32>();
4968        let c1 = out.column(2).as_list::<i32>();
4969        assert_eq!(c0.len(), c1.len());
4970        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4971    }
4972
4973    #[test]
4974    fn test_get_row_group_column_bloom_filter_with_length() {
4975        // convert to new parquet file with bloom_filter_length
4976        let testdata = arrow::util::test_util::parquet_test_data();
4977        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4978        let file = File::open(path).unwrap();
4979        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4980        let schema = builder.schema().clone();
4981        let reader = builder.build().unwrap();
4982
4983        let mut parquet_data = Vec::new();
4984        let props = WriterProperties::builder()
4985            .set_bloom_filter_enabled(true)
4986            .build();
4987        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4988        for batch in reader {
4989            let batch = batch.unwrap();
4990            writer.write(&batch).unwrap();
4991        }
4992        writer.close().unwrap();
4993
4994        // test the new parquet file
4995        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4996    }
4997
4998    #[test]
4999    fn test_get_row_group_column_bloom_filter_without_length() {
5000        let testdata = arrow::util::test_util::parquet_test_data();
5001        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5002        let data = Bytes::from(std::fs::read(path).unwrap());
5003        test_get_row_group_column_bloom_filter(data, false);
5004    }
5005
5006    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5007        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5008
5009        let metadata = builder.metadata();
5010        assert_eq!(metadata.num_row_groups(), 1);
5011        let row_group = metadata.row_group(0);
5012        let column = row_group.column(0);
5013        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5014
5015        let sbbf = builder
5016            .get_row_group_column_bloom_filter(0, 0)
5017            .unwrap()
5018            .unwrap();
5019        assert!(sbbf.check(&"Hello"));
5020        assert!(!sbbf.check(&"Hello_Not_Exists"));
5021    }
5022}