parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32    ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44    PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45    RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51// Exposed so integration tests and benchmarks can temporarily override the threshold.
52pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60/// Builder for constructing Parquet readers that decode into [Apache Arrow]
61/// arrays.
62///
63/// Most users should use one of the following specializations:
64///
65/// * synchronous API: [`ParquetRecordBatchReaderBuilder`]
66/// * `async` API: [`ParquetRecordBatchStreamBuilder`]
67/// * decoder API: [`ParquetPushDecoderBuilder`]
68///
69/// # Features
70/// * Projection pushdown: [`Self::with_projection`]
71/// * Cached metadata: [`ArrowReaderMetadata::load`]
72/// * Offset skipping: [`Self::with_offset`] and [`Self::with_limit`]
73/// * Row group filtering: [`Self::with_row_groups`]
74/// * Range filtering: [`Self::with_row_selection`]
75/// * Row level filtering: [`Self::with_row_filter`]
76///
77/// # Implementing Predicate Pushdown
78///
79/// [`Self::with_row_filter`] permits filter evaluation *during* the decoding
80/// process, which is efficient and allows the most low level optimizations.
81///
82/// However, most Parquet based systems will apply filters at many steps prior
83/// to decoding such as pruning files, row groups and data pages. This crate
84/// provides the low level APIs needed to implement such filtering, but does not
85/// include any logic to actually evaluate predicates. For example:
86///
87/// * [`Self::with_row_groups`] for Row Group pruning
88/// * [`Self::with_row_selection`] for data page pruning
89/// * [`StatisticsConverter`] to convert Parquet statistics to Arrow arrays
90///
91/// The rationale for this design is that implementing predicate pushdown is a
92/// complex topic and varies significantly from system to system. For example
93///
94/// 1. Predicates supported (do you support predicates like prefix matching, user defined functions, etc)
95/// 2. Evaluating predicates on multiple files (with potentially different but compatible schemas)
96/// 3. Evaluating predicates using information from an external metadata catalog (e.g. Apache Iceberg or similar)
97/// 4. Interleaving fetching metadata, evaluating predicates, and decoding files
98///
99/// You can read more about this design in the [Querying Parquet with
100/// Millisecond Latency] Arrow blog post.
101///
102/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
103/// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
104/// [Apache Arrow]: https://arrow.apache.org/
105/// [`StatisticsConverter`]: statistics::StatisticsConverter
106/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
107pub struct ArrowReaderBuilder<T> {
108    /// The "input" to read parquet data from.
109    ///
110    /// Note in the case of the [`ParquetPushDecoderBuilder`], there
111    /// is no underlying input, which is indicated by a type parameter of [`NoInput`]
112    ///
113    /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder
114    /// [`NoInput`]: crate::arrow::push_decoder::NoInput
115    pub(crate) input: T,
116
117    pub(crate) metadata: Arc<ParquetMetaData>,
118
119    pub(crate) schema: SchemaRef,
120
121    pub(crate) fields: Option<Arc<ParquetField>>,
122
123    pub(crate) batch_size: usize,
124
125    pub(crate) row_groups: Option<Vec<usize>>,
126
127    pub(crate) projection: ProjectionMask,
128
129    pub(crate) filter: Option<RowFilter>,
130
131    pub(crate) selection: Option<RowSelection>,
132
133    pub(crate) row_selection_policy: RowSelectionPolicy,
134
135    pub(crate) limit: Option<usize>,
136
137    pub(crate) offset: Option<usize>,
138
139    pub(crate) metrics: ArrowReaderMetrics,
140
141    pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ArrowReaderBuilder<T>")
147            .field("input", &self.input)
148            .field("metadata", &self.metadata)
149            .field("schema", &self.schema)
150            .field("fields", &self.fields)
151            .field("batch_size", &self.batch_size)
152            .field("row_groups", &self.row_groups)
153            .field("projection", &self.projection)
154            .field("filter", &self.filter)
155            .field("selection", &self.selection)
156            .field("row_selection_policy", &self.row_selection_policy)
157            .field("limit", &self.limit)
158            .field("offset", &self.offset)
159            .field("metrics", &self.metrics)
160            .finish()
161    }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166        Self {
167            input,
168            metadata: metadata.metadata,
169            schema: metadata.schema,
170            fields: metadata.fields,
171            batch_size: 1024,
172            row_groups: None,
173            projection: ProjectionMask::all(),
174            filter: None,
175            selection: None,
176            row_selection_policy: RowSelectionPolicy::default(),
177            limit: None,
178            offset: None,
179            metrics: ArrowReaderMetrics::Disabled,
180            max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
181        }
182    }
183
184    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
185    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186        &self.metadata
187    }
188
189    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
190    pub fn parquet_schema(&self) -> &SchemaDescriptor {
191        self.metadata.file_metadata().schema_descr()
192    }
193
194    /// Returns the arrow [`SchemaRef`] for this parquet file
195    pub fn schema(&self) -> &SchemaRef {
196        &self.schema
197    }
198
199    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
200    /// If the batch_size more than the file row count, use the file row count.
201    pub fn with_batch_size(self, batch_size: usize) -> Self {
202        // Try to avoid allocate large buffer
203        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204        Self { batch_size, ..self }
205    }
206
207    /// Only read data from the provided row group indexes
208    ///
209    /// This is also called row group filtering
210    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211        Self {
212            row_groups: Some(row_groups),
213            ..self
214        }
215    }
216
217    /// Only read data from the provided column indexes
218    pub fn with_projection(self, mask: ProjectionMask) -> Self {
219        Self {
220            projection: mask,
221            ..self
222        }
223    }
224
225    /// Configure how row selections should be materialised during execution
226    ///
227    /// See [`RowSelectionPolicy`] for more details
228    pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229        Self {
230            row_selection_policy: policy,
231            ..self
232        }
233    }
234
235    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
236    /// data into memory.
237    ///
238    /// This feature is used to restrict which rows are decoded within row
239    /// groups, skipping ranges of rows that are not needed. Such selections
240    /// could be determined by evaluating predicates against the parquet page
241    /// [`Index`] or some other external information available to a query
242    /// engine.
243    ///
244    /// # Notes
245    ///
246    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
247    /// applying the row selection, and therefore rows from skipped row groups
248    /// should not be included in the [`RowSelection`] (see example below)
249    ///
250    /// It is recommended to enable writing the page index if using this
251    /// functionality, to allow more efficient skipping over data pages. See
252    /// [`ArrowReaderOptions::with_page_index`].
253    ///
254    /// # Example
255    ///
256    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
257    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
258    /// row group 3:
259    ///
260    /// ```text
261    ///   Row Group 0, 1000 rows (selected)
262    ///   Row Group 1, 1000 rows (skipped)
263    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
264    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
265    /// ```
266    ///
267    /// You could pass the following [`RowSelection`]:
268    ///
269    /// ```text
270    ///  Select 1000    (scan all rows in row group 0)
271    ///  Skip 50        (skip the first 50 rows in row group 2)
272    ///  Select 50      (scan rows 50-100 in row group 2)
273    ///  Skip 900       (skip the remaining rows in row group 2)
274    ///  Skip 200       (skip the first 200 rows in row group 3)
275    ///  Select 100     (scan rows 200-300 in row group 3)
276    ///  Skip 700       (skip the remaining rows in row group 3)
277    /// ```
278    /// Note there is no entry for the (entirely) skipped row group 1.
279    ///
280    /// Note you can represent the same selection with fewer entries. Instead of
281    ///
282    /// ```text
283    ///  Skip 900       (skip the remaining rows in row group 2)
284    ///  Skip 200       (skip the first 200 rows in row group 3)
285    /// ```
286    ///
287    /// you could use
288    ///
289    /// ```text
290    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
291    /// ```
292    ///
293    /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
294    pub fn with_row_selection(self, selection: RowSelection) -> Self {
295        Self {
296            selection: Some(selection),
297            ..self
298        }
299    }
300
301    /// Provide a [`RowFilter`] to skip decoding rows
302    ///
303    /// Row filters are applied after row group selection and row selection
304    ///
305    /// It is recommended to enable reading the page index if using this functionality, to allow
306    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
307    pub fn with_row_filter(self, filter: RowFilter) -> Self {
308        Self {
309            filter: Some(filter),
310            ..self
311        }
312    }
313
314    /// Provide a limit to the number of rows to be read
315    ///
316    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
317    /// allowing it to limit the final set of rows decoded after any pushed down predicates
318    ///
319    /// It is recommended to enable reading the page index if using this functionality, to allow
320    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
321    pub fn with_limit(self, limit: usize) -> Self {
322        Self {
323            limit: Some(limit),
324            ..self
325        }
326    }
327
328    /// Provide an offset to skip over the given number of rows
329    ///
330    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
331    /// allowing it to skip rows after any pushed down predicates
332    ///
333    /// It is recommended to enable reading the page index if using this functionality, to allow
334    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
335    pub fn with_offset(self, offset: usize) -> Self {
336        Self {
337            offset: Some(offset),
338            ..self
339        }
340    }
341
342    /// Specify metrics collection during reading
343    ///
344    /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
345    /// clone of the provided metrics to the builder.
346    ///
347    /// For example:
348    ///
349    /// ```rust
350    /// # use std::sync::Arc;
351    /// # use bytes::Bytes;
352    /// # use arrow_array::{Int32Array, RecordBatch};
353    /// # use arrow_schema::{DataType, Field, Schema};
354    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
355    /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
356    /// # use parquet::arrow::ArrowWriter;
357    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
358    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
359    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
360    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
361    /// # writer.write(&batch).unwrap();
362    /// # writer.close().unwrap();
363    /// # let file = Bytes::from(file);
364    /// // Create metrics object to pass into the reader
365    /// let metrics = ArrowReaderMetrics::enabled();
366    /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
367    ///   // Configure the builder to use the metrics by passing a clone
368    ///   .with_metrics(metrics.clone())
369    ///   // Build the reader
370    ///   .build().unwrap();
371    /// // .. read data from the reader ..
372    ///
373    /// // check the metrics
374    /// assert!(metrics.records_read_from_inner().is_some());
375    /// ```
376    pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
377        Self { metrics, ..self }
378    }
379
380    /// Set the maximum size (per row group) of the predicate cache in bytes for
381    /// the async decoder.
382    ///
383    /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
384    /// unlimited cache size.
385    ///
386    /// This cache is used to store decoded arrays that are used in
387    /// predicate evaluation ([`Self::with_row_filter`]).
388    ///
389    /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See
390    /// [this ticket] for more details and alternatives.
391    ///
392    /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
393    /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
394    pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
395        Self {
396            max_predicate_cache_size,
397            ..self
398        }
399    }
400}
401
402/// Options that control how [`ParquetMetaData`] is read when constructing
403/// an Arrow reader.
404///
405/// To use these options, pass them to one of the following methods:
406/// * [`ParquetRecordBatchReaderBuilder::try_new_with_options`]
407/// * [`ParquetRecordBatchStreamBuilder::new_with_options`]
408///
409/// For fine-grained control over metadata loading, use
410/// [`ArrowReaderMetadata::load`] to load metadata with these options,
411///
412/// See [`ArrowReaderBuilder`] for how to configure how the column data
413/// is then read from the file, including projection and filter pushdown
414///
415/// [`ParquetRecordBatchStreamBuilder::new_with_options`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_options
416#[derive(Debug, Clone, Default)]
417pub struct ArrowReaderOptions {
418    /// Should the reader strip any user defined metadata from the Arrow schema
419    skip_arrow_metadata: bool,
420    /// If provided, used as the schema hint when determining the Arrow schema,
421    /// otherwise the schema hint is read from the [ARROW_SCHEMA_META_KEY]
422    ///
423    /// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
424    supplied_schema: Option<SchemaRef>,
425    /// Policy for reading offset and column indexes.
426    pub(crate) page_index_policy: PageIndexPolicy,
427    /// Options to control reading of Parquet metadata
428    metadata_options: ParquetMetaDataOptions,
429    /// If encryption is enabled, the file decryption properties can be provided
430    #[cfg(feature = "encryption")]
431    pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
432
433    virtual_columns: Vec<FieldRef>,
434}
435
436impl ArrowReaderOptions {
437    /// Create a new [`ArrowReaderOptions`] with the default settings
438    pub fn new() -> Self {
439        Self::default()
440    }
441
442    /// Skip decoding the embedded arrow metadata (defaults to `false`)
443    ///
444    /// Parquet files generated by some writers may contain embedded arrow
445    /// schema and metadata.
446    /// This may not be correct or compatible with your system,
447    /// for example, see [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
448    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
449        Self {
450            skip_arrow_metadata,
451            ..self
452        }
453    }
454
455    /// Provide a schema hint to use when reading the Parquet file.
456    ///
457    /// If provided, this schema takes precedence over any arrow schema embedded
458    /// in the metadata (see the [`arrow`] documentation for more details).
459    ///
460    /// If the provided schema is not compatible with the data stored in the
461    /// parquet file schema, an error will be returned when constructing the
462    /// builder.
463    ///
464    /// This option is only required if you want to explicitly control the
465    /// conversion of Parquet types to Arrow types, such as casting a column to
466    /// a different type. For example, if you wanted to read an Int64 in
467    /// a Parquet file to a [`TimestampMicrosecondArray`] in the Arrow schema.
468    ///
469    /// [`arrow`]: crate::arrow
470    /// [`TimestampMicrosecondArray`]: arrow_array::TimestampMicrosecondArray
471    ///
472    /// # Notes
473    ///
474    /// The provided schema must have the same number of columns as the parquet schema and
475    /// the column names must be the same.
476    ///
477    /// # Example
478    /// ```
479    /// use std::io::Bytes;
480    /// use std::sync::Arc;
481    /// use tempfile::tempfile;
482    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
483    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
484    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
485    /// use parquet::arrow::ArrowWriter;
486    ///
487    /// // Write data - schema is inferred from the data to be Int32
488    /// let file = tempfile().unwrap();
489    /// let batch = RecordBatch::try_from_iter(vec![
490    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
491    /// ]).unwrap();
492    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
493    /// writer.write(&batch).unwrap();
494    /// writer.close().unwrap();
495    ///
496    /// // Read the file back.
497    /// // Supply a schema that interprets the Int32 column as a Timestamp.
498    /// let supplied_schema = Arc::new(Schema::new(vec![
499    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
500    /// ]));
501    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
502    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
503    ///     file.try_clone().unwrap(),
504    ///     options
505    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
506    ///
507    /// // Create the reader and read the data using the supplied schema.
508    /// let mut reader = builder.build().unwrap();
509    /// let _batch = reader.next().unwrap().unwrap();
510    /// ```
511    pub fn with_schema(self, schema: SchemaRef) -> Self {
512        Self {
513            supplied_schema: Some(schema),
514            skip_arrow_metadata: true,
515            ..self
516        }
517    }
518
519    /// Enable reading the [`PageIndex`] from the metadata, if present (defaults to `false`)
520    ///
521    /// The `PageIndex` can be used to push down predicates to the parquet scan,
522    /// potentially eliminating unnecessary IO, by some query engines.
523    ///
524    /// If this is enabled, [`ParquetMetaData::column_index`] and
525    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
526    /// information is present in the file.
527    ///
528    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
529    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
530    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
531    pub fn with_page_index(self, page_index: bool) -> Self {
532        let page_index_policy = PageIndexPolicy::from(page_index);
533
534        Self {
535            page_index_policy,
536            ..self
537        }
538    }
539
540    /// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
541    ///
542    /// See [`Self::with_page_index`] for more details.
543    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
544        Self {
545            page_index_policy: policy,
546            ..self
547        }
548    }
549
550    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
551    /// footer will be skipped.
552    ///
553    /// This can be used to avoid reparsing the schema from the file when it is
554    /// already known.
555    pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
556        self.metadata_options.set_schema(schema);
557        self
558    }
559
560    /// Provide the file decryption properties to use when reading encrypted parquet files.
561    ///
562    /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
563    #[cfg(feature = "encryption")]
564    pub fn with_file_decryption_properties(
565        self,
566        file_decryption_properties: Arc<FileDecryptionProperties>,
567    ) -> Self {
568        Self {
569            file_decryption_properties: Some(file_decryption_properties),
570            ..self
571        }
572    }
573
574    /// Include virtual columns in the output.
575    ///
576    /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers.
577    ///
578    /// # Example
579    /// ```
580    /// # use std::sync::Arc;
581    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
582    /// # use arrow_schema::{DataType, Field, Schema};
583    /// # use parquet::arrow::{ArrowWriter, RowNumber};
584    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
585    /// # use tempfile::tempfile;
586    /// #
587    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
588    /// // Create a simple record batch with some data
589    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
590    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
591    ///
592    /// // Write the batch to a temporary parquet file
593    /// let file = tempfile()?;
594    /// let mut writer = ArrowWriter::try_new(
595    ///     file.try_clone()?,
596    ///     batch.schema(),
597    ///     None
598    /// )?;
599    /// writer.write(&batch)?;
600    /// writer.close()?;
601    ///
602    /// // Create a virtual column for row numbers
603    /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false)
604    ///     .with_extension_type(RowNumber));
605    ///
606    /// // Configure options with virtual columns
607    /// let options = ArrowReaderOptions::new()
608    ///     .with_virtual_columns(vec![row_number_field])?;
609    ///
610    /// // Create a reader with the options
611    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
612    ///     file,
613    ///     options
614    /// )?
615    /// .build()?;
616    ///
617    /// // Read the batch - it will include both the original column and the virtual row_number column
618    /// let result_batch = reader.next().unwrap()?;
619    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
620    /// assert_eq!(result_batch.num_rows(), 3);
621    /// #
622    /// # Ok(())
623    /// # }
624    /// ```
625    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
626        // Validate that all fields are virtual columns
627        for field in &virtual_columns {
628            if !is_virtual_column(field) {
629                return Err(ParquetError::General(format!(
630                    "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
631                    field.name()
632                )));
633            }
634        }
635        Ok(Self {
636            virtual_columns,
637            ..self
638        })
639    }
640
641    /// Retrieve the currently set page index behavior.
642    ///
643    /// This can be set via [`with_page_index`][Self::with_page_index].
644    pub fn page_index(&self) -> bool {
645        self.page_index_policy != PageIndexPolicy::Skip
646    }
647
648    /// Retrieve the currently set metadata decoding options.
649    pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
650        &self.metadata_options
651    }
652
653    /// Retrieve the currently set file decryption properties.
654    ///
655    /// This can be set via
656    /// [`file_decryption_properties`][Self::with_file_decryption_properties].
657    #[cfg(feature = "encryption")]
658    pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
659        self.file_decryption_properties.as_ref()
660    }
661}
662
663/// The metadata necessary to construct a [`ArrowReaderBuilder`]
664///
665/// Note this structure is cheaply clone-able as it consists of several arcs.
666///
667/// This structure allows
668///
669/// 1. Loading metadata for a file once and then using that same metadata to
670///    construct multiple separate readers, for example, to distribute readers
671///    across multiple threads
672///
673/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
674///    from the file each time a reader is constructed.
675///
676/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
677#[derive(Debug, Clone)]
678pub struct ArrowReaderMetadata {
679    /// The Parquet Metadata, if known aprior
680    pub(crate) metadata: Arc<ParquetMetaData>,
681    /// The Arrow Schema
682    pub(crate) schema: SchemaRef,
683    /// The Parquet schema (root field)
684    pub(crate) fields: Option<Arc<ParquetField>>,
685}
686
687impl ArrowReaderMetadata {
688    /// Create [`ArrowReaderMetadata`] from the provided [`ArrowReaderOptions`]
689    /// and [`ChunkReader`]
690    ///
691    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
692    /// example of how this can be used
693    ///
694    /// # Notes
695    ///
696    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
697    /// `Self::metadata` is missing the page index, this function will attempt
698    /// to load the page index by making an object store request.
699    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
700        let metadata = ParquetMetaDataReader::new()
701            .with_page_index_policy(options.page_index_policy)
702            .with_metadata_options(Some(options.metadata_options.clone()));
703        #[cfg(feature = "encryption")]
704        let metadata = metadata.with_decryption_properties(
705            options.file_decryption_properties.as_ref().map(Arc::clone),
706        );
707        let metadata = metadata.parse_and_finish(reader)?;
708        Self::try_new(Arc::new(metadata), options)
709    }
710
711    /// Create a new [`ArrowReaderMetadata`] from a pre-existing
712    /// [`ParquetMetaData`] and [`ArrowReaderOptions`].
713    ///
714    /// # Notes
715    ///
716    /// This function will not attempt to load the PageIndex if not present in the metadata, regardless
717    /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed.
718    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
719        match options.supplied_schema {
720            Some(supplied_schema) => Self::with_supplied_schema(
721                metadata,
722                supplied_schema.clone(),
723                &options.virtual_columns,
724            ),
725            None => {
726                let kv_metadata = match options.skip_arrow_metadata {
727                    true => None,
728                    false => metadata.file_metadata().key_value_metadata(),
729                };
730
731                let (schema, fields) = parquet_to_arrow_schema_and_fields(
732                    metadata.file_metadata().schema_descr(),
733                    ProjectionMask::all(),
734                    kv_metadata,
735                    &options.virtual_columns,
736                )?;
737
738                Ok(Self {
739                    metadata,
740                    schema: Arc::new(schema),
741                    fields: fields.map(Arc::new),
742                })
743            }
744        }
745    }
746
747    fn with_supplied_schema(
748        metadata: Arc<ParquetMetaData>,
749        supplied_schema: SchemaRef,
750        virtual_columns: &[FieldRef],
751    ) -> Result<Self> {
752        let parquet_schema = metadata.file_metadata().schema_descr();
753        let field_levels = parquet_to_arrow_field_levels_with_virtual(
754            parquet_schema,
755            ProjectionMask::all(),
756            Some(supplied_schema.fields()),
757            virtual_columns,
758        )?;
759        let fields = field_levels.fields;
760        let inferred_len = fields.len();
761        let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
762        // Ensure the supplied schema has the same number of columns as the parquet schema.
763        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
764        // different lengths, but we check here to be safe.
765        if inferred_len != supplied_len {
766            return Err(arrow_err!(format!(
767                "Incompatible supplied Arrow schema: expected {} columns received {}",
768                inferred_len, supplied_len
769            )));
770        }
771
772        let mut errors = Vec::new();
773
774        let field_iter = supplied_schema.fields().iter().zip(fields.iter());
775
776        for (field1, field2) in field_iter {
777            if field1.data_type() != field2.data_type() {
778                errors.push(format!(
779                    "data type mismatch for field {}: requested {} but found {}",
780                    field1.name(),
781                    field1.data_type(),
782                    field2.data_type()
783                ));
784            }
785            if field1.is_nullable() != field2.is_nullable() {
786                errors.push(format!(
787                    "nullability mismatch for field {}: expected {:?} but found {:?}",
788                    field1.name(),
789                    field1.is_nullable(),
790                    field2.is_nullable()
791                ));
792            }
793            if field1.metadata() != field2.metadata() {
794                errors.push(format!(
795                    "metadata mismatch for field {}: expected {:?} but found {:?}",
796                    field1.name(),
797                    field1.metadata(),
798                    field2.metadata()
799                ));
800            }
801        }
802
803        if !errors.is_empty() {
804            let message = errors.join(", ");
805            return Err(ParquetError::ArrowError(format!(
806                "Incompatible supplied Arrow schema: {message}",
807            )));
808        }
809
810        Ok(Self {
811            metadata,
812            schema: supplied_schema,
813            fields: field_levels.levels.map(Arc::new),
814        })
815    }
816
817    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
818    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
819        &self.metadata
820    }
821
822    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
823    pub fn parquet_schema(&self) -> &SchemaDescriptor {
824        self.metadata.file_metadata().schema_descr()
825    }
826
827    /// Returns the arrow [`SchemaRef`] for this parquet file
828    pub fn schema(&self) -> &SchemaRef {
829        &self.schema
830    }
831}
832
833#[doc(hidden)]
834// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async
835pub struct SyncReader<T: ChunkReader>(T);
836
837impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
838    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
839        f.debug_tuple("SyncReader").field(&self.0).finish()
840    }
841}
842
843/// Creates [`ParquetRecordBatchReader`] for reading Parquet files into Arrow [`RecordBatch`]es
844///
845/// # See Also
846/// * [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] for an async API
847/// * [`crate::arrow::push_decoder::ParquetPushDecoderBuilder`] for a SansIO decoder API
848/// * [`ArrowReaderBuilder`] for additional member functions
849pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
850
851impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
852    /// Create a new [`ParquetRecordBatchReaderBuilder`]
853    ///
854    /// ```
855    /// # use std::sync::Arc;
856    /// # use bytes::Bytes;
857    /// # use arrow_array::{Int32Array, RecordBatch};
858    /// # use arrow_schema::{DataType, Field, Schema};
859    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
860    /// # use parquet::arrow::ArrowWriter;
861    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
862    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
863    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
864    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
865    /// # writer.write(&batch).unwrap();
866    /// # writer.close().unwrap();
867    /// # let file = Bytes::from(file);
868    /// #
869    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
870    ///
871    /// // Inspect metadata
872    /// assert_eq!(builder.metadata().num_row_groups(), 1);
873    ///
874    /// // Construct reader
875    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
876    ///
877    /// // Read data
878    /// let _batch = reader.next().unwrap().unwrap();
879    /// ```
880    pub fn try_new(reader: T) -> Result<Self> {
881        Self::try_new_with_options(reader, Default::default())
882    }
883
884    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
885    ///
886    /// Use this method if you want to control the options for reading the
887    /// [`ParquetMetaData`]
888    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
889        let metadata = ArrowReaderMetadata::load(&reader, options)?;
890        Ok(Self::new_with_metadata(reader, metadata))
891    }
892
893    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
894    ///
895    /// Use this method if you already have [`ParquetMetaData`] for a file.
896    /// This interface allows:
897    ///
898    /// 1. Loading metadata once and using it to create multiple builders with
899    ///    potentially different settings or run on different threads
900    ///
901    /// 2. Using a cached copy of the metadata rather than re-reading it from the
902    ///    file each time a reader is constructed.
903    ///
904    /// See the docs on [`ArrowReaderMetadata`] for more details
905    ///
906    /// # Example
907    /// ```
908    /// # use std::fs::metadata;
909    /// # use std::sync::Arc;
910    /// # use bytes::Bytes;
911    /// # use arrow_array::{Int32Array, RecordBatch};
912    /// # use arrow_schema::{DataType, Field, Schema};
913    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
914    /// # use parquet::arrow::ArrowWriter;
915    /// #
916    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
917    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
918    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
919    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
920    /// # writer.write(&batch).unwrap();
921    /// # writer.close().unwrap();
922    /// # let file = Bytes::from(file);
923    /// #
924    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
925    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
926    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
927    ///
928    /// // Should be able to read from both in parallel
929    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
930    /// ```
931    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
932        Self::new_builder(SyncReader(input), metadata)
933    }
934
935    /// Read bloom filter for a column in a row group
936    ///
937    /// Returns `None` if the column does not have a bloom filter
938    ///
939    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
940    pub fn get_row_group_column_bloom_filter(
941        &self,
942        row_group_idx: usize,
943        column_idx: usize,
944    ) -> Result<Option<Sbbf>> {
945        let metadata = self.metadata.row_group(row_group_idx);
946        let column_metadata = metadata.column(column_idx);
947
948        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
949            offset
950                .try_into()
951                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
952        } else {
953            return Ok(None);
954        };
955
956        let buffer = match column_metadata.bloom_filter_length() {
957            Some(length) => self.input.0.get_bytes(offset, length as usize),
958            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
959        }?;
960
961        let (header, bitset_offset) =
962            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
963
964        match header.algorithm {
965            BloomFilterAlgorithm::BLOCK => {
966                // this match exists to future proof the singleton algorithm enum
967            }
968        }
969        match header.compression {
970            BloomFilterCompression::UNCOMPRESSED => {
971                // this match exists to future proof the singleton compression enum
972            }
973        }
974        match header.hash {
975            BloomFilterHash::XXHASH => {
976                // this match exists to future proof the singleton hash enum
977            }
978        }
979
980        let bitset = match column_metadata.bloom_filter_length() {
981            Some(_) => buffer.slice(
982                (TryInto::<usize>::try_into(bitset_offset).unwrap()
983                    - TryInto::<usize>::try_into(offset).unwrap())..,
984            ),
985            None => {
986                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
987                    ParquetError::General("Bloom filter length is invalid".to_string())
988                })?;
989                self.input.0.get_bytes(bitset_offset, bitset_length)?
990            }
991        };
992        Ok(Some(Sbbf::new(&bitset)))
993    }
994
995    /// Build a [`ParquetRecordBatchReader`]
996    ///
997    /// Note: this will eagerly evaluate any `RowFilter` before returning
998    pub fn build(self) -> Result<ParquetRecordBatchReader> {
999        let Self {
1000            input,
1001            metadata,
1002            schema: _,
1003            fields,
1004            batch_size,
1005            row_groups,
1006            projection,
1007            mut filter,
1008            selection,
1009            row_selection_policy,
1010            limit,
1011            offset,
1012            metrics,
1013            // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
1014            max_predicate_cache_size: _,
1015        } = self;
1016
1017        // Try to avoid allocate large buffer
1018        let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1019
1020        let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1021
1022        let reader = ReaderRowGroups {
1023            reader: Arc::new(input.0),
1024            metadata,
1025            row_groups,
1026        };
1027
1028        let mut plan_builder = ReadPlanBuilder::new(batch_size)
1029            .with_selection(selection)
1030            .with_row_selection_policy(row_selection_policy);
1031
1032        // Update selection based on any filters
1033        if let Some(filter) = filter.as_mut() {
1034            for predicate in filter.predicates.iter_mut() {
1035                // break early if we have ruled out all rows
1036                if !plan_builder.selects_any() {
1037                    break;
1038                }
1039
1040                let mut cache_projection = predicate.projection().clone();
1041                cache_projection.intersect(&projection);
1042
1043                let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1044                    .with_parquet_metadata(&reader.metadata)
1045                    .build_array_reader(fields.as_deref(), predicate.projection())?;
1046
1047                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1048            }
1049        }
1050
1051        let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1052            .with_parquet_metadata(&reader.metadata)
1053            .build_array_reader(fields.as_deref(), &projection)?;
1054
1055        let read_plan = plan_builder
1056            .limited(reader.num_rows())
1057            .with_offset(offset)
1058            .with_limit(limit)
1059            .build_limited()
1060            .build();
1061
1062        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1063    }
1064}
1065
1066struct ReaderRowGroups<T: ChunkReader> {
1067    reader: Arc<T>,
1068
1069    metadata: Arc<ParquetMetaData>,
1070    /// Optional list of row group indices to scan
1071    row_groups: Vec<usize>,
1072}
1073
1074impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1075    fn num_rows(&self) -> usize {
1076        let meta = self.metadata.row_groups();
1077        self.row_groups
1078            .iter()
1079            .map(|x| meta[*x].num_rows() as usize)
1080            .sum()
1081    }
1082
1083    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1084        Ok(Box::new(ReaderPageIterator {
1085            column_idx: i,
1086            reader: self.reader.clone(),
1087            metadata: self.metadata.clone(),
1088            row_groups: self.row_groups.clone().into_iter(),
1089        }))
1090    }
1091
1092    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1093        Box::new(
1094            self.row_groups
1095                .iter()
1096                .map(move |i| self.metadata.row_group(*i)),
1097        )
1098    }
1099
1100    fn metadata(&self) -> &ParquetMetaData {
1101        self.metadata.as_ref()
1102    }
1103}
1104
1105struct ReaderPageIterator<T: ChunkReader> {
1106    reader: Arc<T>,
1107    column_idx: usize,
1108    row_groups: std::vec::IntoIter<usize>,
1109    metadata: Arc<ParquetMetaData>,
1110}
1111
1112impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1113    /// Return the next SerializedPageReader
1114    fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1115        let rg = self.metadata.row_group(rg_idx);
1116        let column_chunk_metadata = rg.column(self.column_idx);
1117        let offset_index = self.metadata.offset_index();
1118        // `offset_index` may not exist and `i[rg_idx]` will be empty.
1119        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`.
1120        let page_locations = offset_index
1121            .filter(|i| !i[rg_idx].is_empty())
1122            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1123        let total_rows = rg.num_rows() as usize;
1124        let reader = self.reader.clone();
1125
1126        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1127            .add_crypto_context(
1128                rg_idx,
1129                self.column_idx,
1130                self.metadata.as_ref(),
1131                column_chunk_metadata,
1132            )
1133    }
1134}
1135
1136impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1137    type Item = Result<Box<dyn PageReader>>;
1138
1139    fn next(&mut self) -> Option<Self::Item> {
1140        let rg_idx = self.row_groups.next()?;
1141        let page_reader = self
1142            .next_page_reader(rg_idx)
1143            .map(|page_reader| Box::new(page_reader) as _);
1144        Some(page_reader)
1145    }
1146}
1147
1148impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1149
1150/// Reads Parquet data as Arrow [`RecordBatch`]es
1151///
1152/// This struct implements the [`RecordBatchReader`] trait and is an
1153/// `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]es.
1154///
1155/// Typically, either reads from a file or an in memory buffer [`Bytes`]
1156///
1157/// Created by [`ParquetRecordBatchReaderBuilder`]
1158///
1159/// [`Bytes`]: bytes::Bytes
1160pub struct ParquetRecordBatchReader {
1161    array_reader: Box<dyn ArrayReader>,
1162    schema: SchemaRef,
1163    read_plan: ReadPlan,
1164}
1165
1166impl Debug for ParquetRecordBatchReader {
1167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1168        f.debug_struct("ParquetRecordBatchReader")
1169            .field("array_reader", &"...")
1170            .field("schema", &self.schema)
1171            .field("read_plan", &self.read_plan)
1172            .finish()
1173    }
1174}
1175
1176impl Iterator for ParquetRecordBatchReader {
1177    type Item = Result<RecordBatch, ArrowError>;
1178
1179    fn next(&mut self) -> Option<Self::Item> {
1180        self.next_inner()
1181            .map_err(|arrow_err| arrow_err.into())
1182            .transpose()
1183    }
1184}
1185
1186impl ParquetRecordBatchReader {
1187    /// Returns the next `RecordBatch` from the reader, or `None` if the reader
1188    /// has reached the end of the file.
1189    ///
1190    /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
1191    /// simplify error handling with `?`
1192    fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1193        let mut read_records = 0;
1194        let batch_size = self.batch_size();
1195        if batch_size == 0 {
1196            return Ok(None);
1197        }
1198        match self.read_plan.row_selection_cursor_mut() {
1199            RowSelectionCursor::Mask(mask_cursor) => {
1200                // Stream the record batch reader using contiguous segments of the selection
1201                // mask, avoiding the need to materialize intermediate `RowSelector` ranges.
1202                while !mask_cursor.is_empty() {
1203                    let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1204                        return Ok(None);
1205                    };
1206
1207                    if mask_chunk.initial_skip > 0 {
1208                        let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1209                        if skipped != mask_chunk.initial_skip {
1210                            return Err(general_err!(
1211                                "failed to skip rows, expected {}, got {}",
1212                                mask_chunk.initial_skip,
1213                                skipped
1214                            ));
1215                        }
1216                    }
1217
1218                    if mask_chunk.chunk_rows == 0 {
1219                        if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1220                            return Ok(None);
1221                        }
1222                        continue;
1223                    }
1224
1225                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1226
1227                    let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1228                    if read == 0 {
1229                        return Err(general_err!(
1230                            "reached end of column while expecting {} rows",
1231                            mask_chunk.chunk_rows
1232                        ));
1233                    }
1234                    if read != mask_chunk.chunk_rows {
1235                        return Err(general_err!(
1236                            "insufficient rows read from array reader - expected {}, got {}",
1237                            mask_chunk.chunk_rows,
1238                            read
1239                        ));
1240                    }
1241
1242                    let array = self.array_reader.consume_batch()?;
1243                    // The column reader exposes the projection as a struct array; convert this
1244                    // into a record batch before applying the boolean filter mask.
1245                    let struct_array = array.as_struct_opt().ok_or_else(|| {
1246                        ArrowError::ParquetError(
1247                            "Struct array reader should return struct array".to_string(),
1248                        )
1249                    })?;
1250
1251                    let filtered_batch =
1252                        filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1253
1254                    if filtered_batch.num_rows() != mask_chunk.selected_rows {
1255                        return Err(general_err!(
1256                            "filtered rows mismatch selection - expected {}, got {}",
1257                            mask_chunk.selected_rows,
1258                            filtered_batch.num_rows()
1259                        ));
1260                    }
1261
1262                    if filtered_batch.num_rows() == 0 {
1263                        continue;
1264                    }
1265
1266                    return Ok(Some(filtered_batch));
1267                }
1268            }
1269            RowSelectionCursor::Selectors(selectors_cursor) => {
1270                while read_records < batch_size && !selectors_cursor.is_empty() {
1271                    let front = selectors_cursor.next_selector();
1272                    if front.skip {
1273                        let skipped = self.array_reader.skip_records(front.row_count)?;
1274
1275                        if skipped != front.row_count {
1276                            return Err(general_err!(
1277                                "failed to skip rows, expected {}, got {}",
1278                                front.row_count,
1279                                skipped
1280                            ));
1281                        }
1282                        continue;
1283                    }
1284
1285                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
1286                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
1287                    if front.row_count == 0 {
1288                        continue;
1289                    }
1290
1291                    // try to read record
1292                    let need_read = batch_size - read_records;
1293                    let to_read = match front.row_count.checked_sub(need_read) {
1294                        Some(remaining) if remaining != 0 => {
1295                            // if page row count less than batch_size we must set batch size to page row count.
1296                            // add check avoid dead loop
1297                            selectors_cursor.return_selector(RowSelector::select(remaining));
1298                            need_read
1299                        }
1300                        _ => front.row_count,
1301                    };
1302                    match self.array_reader.read_records(to_read)? {
1303                        0 => break,
1304                        rec => read_records += rec,
1305                    };
1306                }
1307            }
1308            RowSelectionCursor::All => {
1309                self.array_reader.read_records(batch_size)?;
1310            }
1311        };
1312
1313        let array = self.array_reader.consume_batch()?;
1314        let struct_array = array.as_struct_opt().ok_or_else(|| {
1315            ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1316        })?;
1317
1318        Ok(if struct_array.len() > 0 {
1319            Some(RecordBatch::from(struct_array))
1320        } else {
1321            None
1322        })
1323    }
1324}
1325
1326impl RecordBatchReader for ParquetRecordBatchReader {
1327    /// Returns the projected [`SchemaRef`] for reading the parquet file.
1328    ///
1329    /// Note that the schema metadata will be stripped here. See
1330    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
1331    fn schema(&self) -> SchemaRef {
1332        self.schema.clone()
1333    }
1334}
1335
1336impl ParquetRecordBatchReader {
1337    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
1338    ///
1339    /// See [`ParquetRecordBatchReaderBuilder`] for more options
1340    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1341        ParquetRecordBatchReaderBuilder::try_new(reader)?
1342            .with_batch_size(batch_size)
1343            .build()
1344    }
1345
1346    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
1347    ///
1348    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
1349    /// higher-level interface for reading parquet data from a file
1350    pub fn try_new_with_row_groups(
1351        levels: &FieldLevels,
1352        row_groups: &dyn RowGroups,
1353        batch_size: usize,
1354        selection: Option<RowSelection>,
1355    ) -> Result<Self> {
1356        // note metrics are not supported in this API
1357        let metrics = ArrowReaderMetrics::disabled();
1358        let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1359            .with_parquet_metadata(row_groups.metadata())
1360            .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1361
1362        let read_plan = ReadPlanBuilder::new(batch_size)
1363            .with_selection(selection)
1364            .build();
1365
1366        Ok(Self {
1367            array_reader,
1368            schema: Arc::new(Schema::new(levels.fields.clone())),
1369            read_plan,
1370        })
1371    }
1372
1373    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
1374    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
1375    /// all rows will be returned
1376    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1377        let schema = match array_reader.get_data_type() {
1378            ArrowType::Struct(fields) => Schema::new(fields.clone()),
1379            _ => unreachable!("Struct array reader's data type is not struct!"),
1380        };
1381
1382        Self {
1383            array_reader,
1384            schema: Arc::new(schema),
1385            read_plan,
1386        }
1387    }
1388
1389    #[inline(always)]
1390    pub(crate) fn batch_size(&self) -> usize {
1391        self.read_plan.batch_size()
1392    }
1393}
1394
1395#[cfg(test)]
1396pub(crate) mod tests {
1397    use std::cmp::min;
1398    use std::collections::{HashMap, VecDeque};
1399    use std::fmt::Formatter;
1400    use std::fs::File;
1401    use std::io::Seek;
1402    use std::path::PathBuf;
1403    use std::sync::Arc;
1404
1405    use rand::rngs::StdRng;
1406    use rand::{Rng, RngCore, SeedableRng, random, rng};
1407    use tempfile::tempfile;
1408
1409    use crate::arrow::arrow_reader::{
1410        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1411        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1412        RowSelectionPolicy, RowSelector,
1413    };
1414    use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber};
1415    use crate::arrow::{ArrowWriter, ProjectionMask};
1416    use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1417    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1418    use crate::data_type::{
1419        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1420        FloatType, Int32Type, Int64Type, Int96, Int96Type,
1421    };
1422    use crate::errors::Result;
1423    use crate::file::metadata::ParquetMetaData;
1424    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1425    use crate::file::writer::SerializedFileWriter;
1426    use crate::schema::parser::parse_message_type;
1427    use crate::schema::types::{Type, TypePtr};
1428    use crate::util::test_common::rand_gen::RandGen;
1429    use arrow::compute::kernels::cmp::eq;
1430    use arrow::compute::or;
1431    use arrow_array::builder::*;
1432    use arrow_array::cast::AsArray;
1433    use arrow_array::types::{
1434        Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1435        DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1436        Time64MicrosecondType,
1437    };
1438    use arrow_array::*;
1439    use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1440    use arrow_data::{ArrayData, ArrayDataBuilder};
1441    use arrow_schema::{
1442        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1443    };
1444    use arrow_select::concat::concat_batches;
1445    use bytes::Bytes;
1446    use half::f16;
1447    use num_traits::PrimInt;
1448
1449    #[test]
1450    fn test_arrow_reader_all_columns() {
1451        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1452
1453        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1454        let original_schema = Arc::clone(builder.schema());
1455        let reader = builder.build().unwrap();
1456
1457        // Verify that the schema was correctly parsed
1458        assert_eq!(original_schema.fields(), reader.schema().fields());
1459    }
1460
1461    #[test]
1462    fn test_reuse_schema() {
1463        let file = get_test_file("parquet/alltypes-java.parquet");
1464
1465        let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1466        let expected = builder.metadata;
1467        let schema = expected.file_metadata().schema_descr_ptr();
1468
1469        let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1470        let builder =
1471            ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1472
1473        // Verify that the metadata matches
1474        assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1475    }
1476
1477    #[test]
1478    fn test_arrow_reader_single_column() {
1479        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1480
1481        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1482        let original_schema = Arc::clone(builder.schema());
1483
1484        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1485        let reader = builder.with_projection(mask).build().unwrap();
1486
1487        // Verify that the schema was correctly parsed
1488        assert_eq!(1, reader.schema().fields().len());
1489        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1490    }
1491
1492    #[test]
1493    fn test_arrow_reader_single_column_by_name() {
1494        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1495
1496        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1497        let original_schema = Arc::clone(builder.schema());
1498
1499        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1500        let reader = builder.with_projection(mask).build().unwrap();
1501
1502        // Verify that the schema was correctly parsed
1503        assert_eq!(1, reader.schema().fields().len());
1504        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1505    }
1506
1507    #[test]
1508    fn test_null_column_reader_test() {
1509        let mut file = tempfile::tempfile().unwrap();
1510
1511        let schema = "
1512            message message {
1513                OPTIONAL INT32 int32;
1514            }
1515        ";
1516        let schema = Arc::new(parse_message_type(schema).unwrap());
1517
1518        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1519        generate_single_column_file_with_data::<Int32Type>(
1520            &[vec![], vec![]],
1521            Some(&def_levels),
1522            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1523            schema,
1524            Some(Field::new("int32", ArrowDataType::Null, true)),
1525            &Default::default(),
1526        )
1527        .unwrap();
1528
1529        file.rewind().unwrap();
1530
1531        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1532        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1533
1534        assert_eq!(batches.len(), 4);
1535        for batch in &batches[0..3] {
1536            assert_eq!(batch.num_rows(), 2);
1537            assert_eq!(batch.num_columns(), 1);
1538            assert_eq!(batch.column(0).null_count(), 2);
1539        }
1540
1541        assert_eq!(batches[3].num_rows(), 1);
1542        assert_eq!(batches[3].num_columns(), 1);
1543        assert_eq!(batches[3].column(0).null_count(), 1);
1544    }
1545
1546    #[test]
1547    fn test_primitive_single_column_reader_test() {
1548        run_single_column_reader_tests::<BoolType, _, BoolType>(
1549            2,
1550            ConvertedType::NONE,
1551            None,
1552            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1553            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1554        );
1555        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1556            2,
1557            ConvertedType::NONE,
1558            None,
1559            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1560            &[
1561                Encoding::PLAIN,
1562                Encoding::RLE_DICTIONARY,
1563                Encoding::DELTA_BINARY_PACKED,
1564                Encoding::BYTE_STREAM_SPLIT,
1565            ],
1566        );
1567        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1568            2,
1569            ConvertedType::NONE,
1570            None,
1571            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1572            &[
1573                Encoding::PLAIN,
1574                Encoding::RLE_DICTIONARY,
1575                Encoding::DELTA_BINARY_PACKED,
1576                Encoding::BYTE_STREAM_SPLIT,
1577            ],
1578        );
1579        run_single_column_reader_tests::<FloatType, _, FloatType>(
1580            2,
1581            ConvertedType::NONE,
1582            None,
1583            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1584            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1585        );
1586    }
1587
1588    #[test]
1589    fn test_unsigned_primitive_single_column_reader_test() {
1590        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1591            2,
1592            ConvertedType::UINT_32,
1593            Some(ArrowDataType::UInt32),
1594            |vals| {
1595                Arc::new(UInt32Array::from_iter(
1596                    vals.iter().map(|x| x.map(|x| x as u32)),
1597                ))
1598            },
1599            &[
1600                Encoding::PLAIN,
1601                Encoding::RLE_DICTIONARY,
1602                Encoding::DELTA_BINARY_PACKED,
1603            ],
1604        );
1605        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1606            2,
1607            ConvertedType::UINT_64,
1608            Some(ArrowDataType::UInt64),
1609            |vals| {
1610                Arc::new(UInt64Array::from_iter(
1611                    vals.iter().map(|x| x.map(|x| x as u64)),
1612                ))
1613            },
1614            &[
1615                Encoding::PLAIN,
1616                Encoding::RLE_DICTIONARY,
1617                Encoding::DELTA_BINARY_PACKED,
1618            ],
1619        );
1620    }
1621
1622    #[test]
1623    fn test_unsigned_roundtrip() {
1624        let schema = Arc::new(Schema::new(vec![
1625            Field::new("uint32", ArrowDataType::UInt32, true),
1626            Field::new("uint64", ArrowDataType::UInt64, true),
1627        ]));
1628
1629        let mut buf = Vec::with_capacity(1024);
1630        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1631
1632        let original = RecordBatch::try_new(
1633            schema,
1634            vec![
1635                Arc::new(UInt32Array::from_iter_values([
1636                    0,
1637                    i32::MAX as u32,
1638                    u32::MAX,
1639                ])),
1640                Arc::new(UInt64Array::from_iter_values([
1641                    0,
1642                    i64::MAX as u64,
1643                    u64::MAX,
1644                ])),
1645            ],
1646        )
1647        .unwrap();
1648
1649        writer.write(&original).unwrap();
1650        writer.close().unwrap();
1651
1652        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1653        let ret = reader.next().unwrap().unwrap();
1654        assert_eq!(ret, original);
1655
1656        // Check they can be downcast to the correct type
1657        ret.column(0)
1658            .as_any()
1659            .downcast_ref::<UInt32Array>()
1660            .unwrap();
1661
1662        ret.column(1)
1663            .as_any()
1664            .downcast_ref::<UInt64Array>()
1665            .unwrap();
1666    }
1667
1668    #[test]
1669    fn test_float16_roundtrip() -> Result<()> {
1670        let schema = Arc::new(Schema::new(vec![
1671            Field::new("float16", ArrowDataType::Float16, false),
1672            Field::new("float16-nullable", ArrowDataType::Float16, true),
1673        ]));
1674
1675        let mut buf = Vec::with_capacity(1024);
1676        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1677
1678        let original = RecordBatch::try_new(
1679            schema,
1680            vec![
1681                Arc::new(Float16Array::from_iter_values([
1682                    f16::EPSILON,
1683                    f16::MIN,
1684                    f16::MAX,
1685                    f16::NAN,
1686                    f16::INFINITY,
1687                    f16::NEG_INFINITY,
1688                    f16::ONE,
1689                    f16::NEG_ONE,
1690                    f16::ZERO,
1691                    f16::NEG_ZERO,
1692                    f16::E,
1693                    f16::PI,
1694                    f16::FRAC_1_PI,
1695                ])),
1696                Arc::new(Float16Array::from(vec![
1697                    None,
1698                    None,
1699                    None,
1700                    Some(f16::NAN),
1701                    Some(f16::INFINITY),
1702                    Some(f16::NEG_INFINITY),
1703                    None,
1704                    None,
1705                    None,
1706                    None,
1707                    None,
1708                    None,
1709                    Some(f16::FRAC_1_PI),
1710                ])),
1711            ],
1712        )?;
1713
1714        writer.write(&original)?;
1715        writer.close()?;
1716
1717        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1718        let ret = reader.next().unwrap()?;
1719        assert_eq!(ret, original);
1720
1721        // Ensure can be downcast to the correct type
1722        ret.column(0).as_primitive::<Float16Type>();
1723        ret.column(1).as_primitive::<Float16Type>();
1724
1725        Ok(())
1726    }
1727
1728    #[test]
1729    fn test_time_utc_roundtrip() -> Result<()> {
1730        let schema = Arc::new(Schema::new(vec![
1731            Field::new(
1732                "time_millis",
1733                ArrowDataType::Time32(TimeUnit::Millisecond),
1734                true,
1735            )
1736            .with_metadata(HashMap::from_iter(vec![(
1737                "adjusted_to_utc".to_string(),
1738                "".to_string(),
1739            )])),
1740            Field::new(
1741                "time_micros",
1742                ArrowDataType::Time64(TimeUnit::Microsecond),
1743                true,
1744            )
1745            .with_metadata(HashMap::from_iter(vec![(
1746                "adjusted_to_utc".to_string(),
1747                "".to_string(),
1748            )])),
1749        ]));
1750
1751        let mut buf = Vec::with_capacity(1024);
1752        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1753
1754        let original = RecordBatch::try_new(
1755            schema,
1756            vec![
1757                Arc::new(Time32MillisecondArray::from(vec![
1758                    Some(-1),
1759                    Some(0),
1760                    Some(86_399_000),
1761                    Some(86_400_000),
1762                    Some(86_401_000),
1763                    None,
1764                ])),
1765                Arc::new(Time64MicrosecondArray::from(vec![
1766                    Some(-1),
1767                    Some(0),
1768                    Some(86_399 * 1_000_000),
1769                    Some(86_400 * 1_000_000),
1770                    Some(86_401 * 1_000_000),
1771                    None,
1772                ])),
1773            ],
1774        )?;
1775
1776        writer.write(&original)?;
1777        writer.close()?;
1778
1779        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1780        let ret = reader.next().unwrap()?;
1781        assert_eq!(ret, original);
1782
1783        // Ensure can be downcast to the correct type
1784        ret.column(0).as_primitive::<Time32MillisecondType>();
1785        ret.column(1).as_primitive::<Time64MicrosecondType>();
1786
1787        Ok(())
1788    }
1789
1790    #[test]
1791    fn test_date32_roundtrip() -> Result<()> {
1792        use arrow_array::Date32Array;
1793
1794        let schema = Arc::new(Schema::new(vec![Field::new(
1795            "date32",
1796            ArrowDataType::Date32,
1797            false,
1798        )]));
1799
1800        let mut buf = Vec::with_capacity(1024);
1801
1802        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1803
1804        let original = RecordBatch::try_new(
1805            schema,
1806            vec![Arc::new(Date32Array::from(vec![
1807                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1808            ]))],
1809        )?;
1810
1811        writer.write(&original)?;
1812        writer.close()?;
1813
1814        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1815        let ret = reader.next().unwrap()?;
1816        assert_eq!(ret, original);
1817
1818        // Ensure can be downcast to the correct type
1819        ret.column(0).as_primitive::<Date32Type>();
1820
1821        Ok(())
1822    }
1823
1824    #[test]
1825    fn test_date64_roundtrip() -> Result<()> {
1826        use arrow_array::Date64Array;
1827
1828        let schema = Arc::new(Schema::new(vec![
1829            Field::new("small-date64", ArrowDataType::Date64, false),
1830            Field::new("big-date64", ArrowDataType::Date64, false),
1831            Field::new("invalid-date64", ArrowDataType::Date64, false),
1832        ]));
1833
1834        let mut default_buf = Vec::with_capacity(1024);
1835        let mut coerce_buf = Vec::with_capacity(1024);
1836
1837        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1838
1839        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1840        let mut coerce_writer =
1841            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1842
1843        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1844
1845        let original = RecordBatch::try_new(
1846            schema,
1847            vec![
1848                // small-date64
1849                Arc::new(Date64Array::from(vec![
1850                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1851                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1852                    0,
1853                    1_000 * NUM_MILLISECONDS_IN_DAY,
1854                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1855                ])),
1856                // big-date64
1857                Arc::new(Date64Array::from(vec![
1858                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1859                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1860                    0,
1861                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1862                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1863                ])),
1864                // invalid-date64
1865                Arc::new(Date64Array::from(vec![
1866                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1867                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1868                    1,
1869                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1870                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1871                ])),
1872            ],
1873        )?;
1874
1875        default_writer.write(&original)?;
1876        coerce_writer.write(&original)?;
1877
1878        default_writer.close()?;
1879        coerce_writer.close()?;
1880
1881        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1882        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1883
1884        let default_ret = default_reader.next().unwrap()?;
1885        let coerce_ret = coerce_reader.next().unwrap()?;
1886
1887        // Roundtrip should be successful when default writer used
1888        assert_eq!(default_ret, original);
1889
1890        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1891        assert_eq!(coerce_ret.column(0), original.column(0));
1892        assert_ne!(coerce_ret.column(1), original.column(1));
1893        assert_ne!(coerce_ret.column(2), original.column(2));
1894
1895        // Ensure both can be downcast to the correct type
1896        default_ret.column(0).as_primitive::<Date64Type>();
1897        coerce_ret.column(0).as_primitive::<Date64Type>();
1898
1899        Ok(())
1900    }
1901    struct RandFixedLenGen {}
1902
1903    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1904        fn r#gen(len: i32) -> FixedLenByteArray {
1905            let mut v = vec![0u8; len as usize];
1906            rng().fill_bytes(&mut v);
1907            ByteArray::from(v).into()
1908        }
1909    }
1910
1911    #[test]
1912    fn test_fixed_length_binary_column_reader() {
1913        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1914            20,
1915            ConvertedType::NONE,
1916            None,
1917            |vals| {
1918                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1919                for val in vals {
1920                    match val {
1921                        Some(b) => builder.append_value(b).unwrap(),
1922                        None => builder.append_null(),
1923                    }
1924                }
1925                Arc::new(builder.finish())
1926            },
1927            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1928        );
1929    }
1930
1931    #[test]
1932    fn test_interval_day_time_column_reader() {
1933        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1934            12,
1935            ConvertedType::INTERVAL,
1936            None,
1937            |vals| {
1938                Arc::new(
1939                    vals.iter()
1940                        .map(|x| {
1941                            x.as_ref().map(|b| IntervalDayTime {
1942                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1943                                milliseconds: i32::from_le_bytes(
1944                                    b.as_ref()[8..12].try_into().unwrap(),
1945                                ),
1946                            })
1947                        })
1948                        .collect::<IntervalDayTimeArray>(),
1949                )
1950            },
1951            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1952        );
1953    }
1954
1955    #[test]
1956    fn test_int96_single_column_reader_test() {
1957        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1958
1959        type TypeHintAndConversionFunction =
1960            (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1961
1962        let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1963            // Test without a specified ArrowType hint.
1964            (None, |vals: &[Option<Int96>]| {
1965                Arc::new(TimestampNanosecondArray::from_iter(
1966                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1967                )) as ArrayRef
1968            }),
1969            // Test other TimeUnits as ArrowType hints.
1970            (
1971                Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1972                |vals: &[Option<Int96>]| {
1973                    Arc::new(TimestampSecondArray::from_iter(
1974                        vals.iter().map(|x| x.map(|x| x.to_seconds())),
1975                    )) as ArrayRef
1976                },
1977            ),
1978            (
1979                Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1980                |vals: &[Option<Int96>]| {
1981                    Arc::new(TimestampMillisecondArray::from_iter(
1982                        vals.iter().map(|x| x.map(|x| x.to_millis())),
1983                    )) as ArrayRef
1984                },
1985            ),
1986            (
1987                Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1988                |vals: &[Option<Int96>]| {
1989                    Arc::new(TimestampMicrosecondArray::from_iter(
1990                        vals.iter().map(|x| x.map(|x| x.to_micros())),
1991                    )) as ArrayRef
1992                },
1993            ),
1994            (
1995                Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1996                |vals: &[Option<Int96>]| {
1997                    Arc::new(TimestampNanosecondArray::from_iter(
1998                        vals.iter().map(|x| x.map(|x| x.to_nanos())),
1999                    )) as ArrayRef
2000                },
2001            ),
2002            // Test another timezone with TimeUnit as ArrowType hints.
2003            (
2004                Some(ArrowDataType::Timestamp(
2005                    TimeUnit::Second,
2006                    Some(Arc::from("-05:00")),
2007                )),
2008                |vals: &[Option<Int96>]| {
2009                    Arc::new(
2010                        TimestampSecondArray::from_iter(
2011                            vals.iter().map(|x| x.map(|x| x.to_seconds())),
2012                        )
2013                        .with_timezone("-05:00"),
2014                    ) as ArrayRef
2015                },
2016            ),
2017        ];
2018
2019        resolutions.iter().for_each(|(arrow_type, converter)| {
2020            run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2021                2,
2022                ConvertedType::NONE,
2023                arrow_type.clone(),
2024                converter,
2025                encodings,
2026            );
2027        })
2028    }
2029
2030    #[test]
2031    fn test_int96_from_spark_file_with_provided_schema() {
2032        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2033        // range for resolution compared to a nanosecond timestamp. We must provide a schema with
2034        // microsecond resolution for the Parquet reader to interpret these values correctly.
2035        use arrow_schema::DataType::Timestamp;
2036        let test_data = arrow::util::test_util::parquet_test_data();
2037        let path = format!("{test_data}/int96_from_spark.parquet");
2038        let file = File::open(path).unwrap();
2039
2040        let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2041            "a",
2042            Timestamp(TimeUnit::Microsecond, None),
2043            true,
2044        )]));
2045        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2046
2047        let mut record_reader =
2048            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2049                .unwrap()
2050                .build()
2051                .unwrap();
2052
2053        let batch = record_reader.next().unwrap().unwrap();
2054        assert_eq!(batch.num_columns(), 1);
2055        let column = batch.column(0);
2056        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2057
2058        let expected = Arc::new(Int64Array::from(vec![
2059            Some(1704141296123456),
2060            Some(1704070800000000),
2061            Some(253402225200000000),
2062            Some(1735599600000000),
2063            None,
2064            Some(9089380393200000000),
2065        ]));
2066
2067        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2068        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2069        // anyway, so this should be a zero-copy non-modifying cast.
2070
2071        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2072        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2073
2074        assert_eq!(casted_timestamps.len(), expected.len());
2075
2076        casted_timestamps
2077            .iter()
2078            .zip(expected.iter())
2079            .for_each(|(lhs, rhs)| {
2080                assert_eq!(lhs, rhs);
2081            });
2082    }
2083
2084    #[test]
2085    fn test_int96_from_spark_file_without_provided_schema() {
2086        // int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
2087        // range for resolution compared to a nanosecond timestamp. Without a provided schema, some
2088        // values when read as nanosecond resolution overflow and result in garbage values.
2089        use arrow_schema::DataType::Timestamp;
2090        let test_data = arrow::util::test_util::parquet_test_data();
2091        let path = format!("{test_data}/int96_from_spark.parquet");
2092        let file = File::open(path).unwrap();
2093
2094        let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2095            .unwrap()
2096            .build()
2097            .unwrap();
2098
2099        let batch = record_reader.next().unwrap().unwrap();
2100        assert_eq!(batch.num_columns(), 1);
2101        let column = batch.column(0);
2102        assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2103
2104        let expected = Arc::new(Int64Array::from(vec![
2105            Some(1704141296123456000),  // Reads as nanosecond fine (note 3 extra 0s)
2106            Some(1704070800000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2107            Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
2108            Some(1735599600000000000),  // Reads as nanosecond fine (note 3 extra 0s)
2109            None,
2110            Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
2111        ]));
2112
2113        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
2114        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
2115        // anyway, so this should be a zero-copy non-modifying cast.
2116
2117        let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2118        let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2119
2120        assert_eq!(casted_timestamps.len(), expected.len());
2121
2122        casted_timestamps
2123            .iter()
2124            .zip(expected.iter())
2125            .for_each(|(lhs, rhs)| {
2126                assert_eq!(lhs, rhs);
2127            });
2128    }
2129
2130    struct RandUtf8Gen {}
2131
2132    impl RandGen<ByteArrayType> for RandUtf8Gen {
2133        fn r#gen(len: i32) -> ByteArray {
2134            Int32Type::r#gen(len).to_string().as_str().into()
2135        }
2136    }
2137
2138    #[test]
2139    fn test_utf8_single_column_reader_test() {
2140        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2141            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2142                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2143            })))
2144        }
2145
2146        let encodings = &[
2147            Encoding::PLAIN,
2148            Encoding::RLE_DICTIONARY,
2149            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2150            Encoding::DELTA_BYTE_ARRAY,
2151        ];
2152
2153        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2154            2,
2155            ConvertedType::NONE,
2156            None,
2157            |vals| {
2158                Arc::new(BinaryArray::from_iter(
2159                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2160                ))
2161            },
2162            encodings,
2163        );
2164
2165        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2166            2,
2167            ConvertedType::UTF8,
2168            None,
2169            string_converter::<i32>,
2170            encodings,
2171        );
2172
2173        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2174            2,
2175            ConvertedType::UTF8,
2176            Some(ArrowDataType::Utf8),
2177            string_converter::<i32>,
2178            encodings,
2179        );
2180
2181        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2182            2,
2183            ConvertedType::UTF8,
2184            Some(ArrowDataType::LargeUtf8),
2185            string_converter::<i64>,
2186            encodings,
2187        );
2188
2189        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2190        for key in &small_key_types {
2191            for encoding in encodings {
2192                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2193                opts.encoding = *encoding;
2194
2195                let data_type =
2196                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2197
2198                // Cannot run full test suite as keys overflow, run small test instead
2199                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2200                    opts,
2201                    2,
2202                    ConvertedType::UTF8,
2203                    Some(data_type.clone()),
2204                    move |vals| {
2205                        let vals = string_converter::<i32>(vals);
2206                        arrow::compute::cast(&vals, &data_type).unwrap()
2207                    },
2208                );
2209            }
2210        }
2211
2212        let key_types = [
2213            ArrowDataType::Int16,
2214            ArrowDataType::UInt16,
2215            ArrowDataType::Int32,
2216            ArrowDataType::UInt32,
2217            ArrowDataType::Int64,
2218            ArrowDataType::UInt64,
2219        ];
2220
2221        for key in &key_types {
2222            let data_type =
2223                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2224
2225            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2226                2,
2227                ConvertedType::UTF8,
2228                Some(data_type.clone()),
2229                move |vals| {
2230                    let vals = string_converter::<i32>(vals);
2231                    arrow::compute::cast(&vals, &data_type).unwrap()
2232                },
2233                encodings,
2234            );
2235
2236            // https://github.com/apache/arrow-rs/issues/1179
2237            // let data_type = ArrowDataType::Dictionary(
2238            //     Box::new(key.clone()),
2239            //     Box::new(ArrowDataType::LargeUtf8),
2240            // );
2241            //
2242            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2243            //     2,
2244            //     ConvertedType::UTF8,
2245            //     Some(data_type.clone()),
2246            //     move |vals| {
2247            //         let vals = string_converter::<i64>(vals);
2248            //         arrow::compute::cast(&vals, &data_type).unwrap()
2249            //     },
2250            //     encodings,
2251            // );
2252        }
2253    }
2254
2255    #[test]
2256    fn test_decimal_nullable_struct() {
2257        let decimals = Decimal256Array::from_iter_values(
2258            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2259        );
2260
2261        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2262            "decimals",
2263            decimals.data_type().clone(),
2264            false,
2265        )])))
2266        .len(8)
2267        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2268        .child_data(vec![decimals.into_data()])
2269        .build()
2270        .unwrap();
2271
2272        let written =
2273            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2274                .unwrap();
2275
2276        let mut buffer = Vec::with_capacity(1024);
2277        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2278        writer.write(&written).unwrap();
2279        writer.close().unwrap();
2280
2281        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2282            .unwrap()
2283            .collect::<Result<Vec<_>, _>>()
2284            .unwrap();
2285
2286        assert_eq!(&written.slice(0, 3), &read[0]);
2287        assert_eq!(&written.slice(3, 3), &read[1]);
2288        assert_eq!(&written.slice(6, 2), &read[2]);
2289    }
2290
2291    #[test]
2292    fn test_int32_nullable_struct() {
2293        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2294        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2295            "int32",
2296            int32.data_type().clone(),
2297            false,
2298        )])))
2299        .len(8)
2300        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2301        .child_data(vec![int32.into_data()])
2302        .build()
2303        .unwrap();
2304
2305        let written =
2306            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2307                .unwrap();
2308
2309        let mut buffer = Vec::with_capacity(1024);
2310        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2311        writer.write(&written).unwrap();
2312        writer.close().unwrap();
2313
2314        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2315            .unwrap()
2316            .collect::<Result<Vec<_>, _>>()
2317            .unwrap();
2318
2319        assert_eq!(&written.slice(0, 3), &read[0]);
2320        assert_eq!(&written.slice(3, 3), &read[1]);
2321        assert_eq!(&written.slice(6, 2), &read[2]);
2322    }
2323
2324    #[test]
2325    fn test_decimal_list() {
2326        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2327
2328        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
2329        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2330            decimals.data_type().clone(),
2331            false,
2332        ))))
2333        .len(7)
2334        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2335        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2336        .child_data(vec![decimals.into_data()])
2337        .build()
2338        .unwrap();
2339
2340        let written =
2341            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2342                .unwrap();
2343
2344        let mut buffer = Vec::with_capacity(1024);
2345        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2346        writer.write(&written).unwrap();
2347        writer.close().unwrap();
2348
2349        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2350            .unwrap()
2351            .collect::<Result<Vec<_>, _>>()
2352            .unwrap();
2353
2354        assert_eq!(&written.slice(0, 3), &read[0]);
2355        assert_eq!(&written.slice(3, 3), &read[1]);
2356        assert_eq!(&written.slice(6, 1), &read[2]);
2357    }
2358
2359    #[test]
2360    fn test_read_decimal_file() {
2361        use arrow_array::Decimal128Array;
2362        let testdata = arrow::util::test_util::parquet_test_data();
2363        let file_variants = vec![
2364            ("byte_array", 4),
2365            ("fixed_length", 25),
2366            ("int32", 4),
2367            ("int64", 10),
2368        ];
2369        for (prefix, target_precision) in file_variants {
2370            let path = format!("{testdata}/{prefix}_decimal.parquet");
2371            let file = File::open(path).unwrap();
2372            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2373
2374            let batch = record_reader.next().unwrap().unwrap();
2375            assert_eq!(batch.num_rows(), 24);
2376            let col = batch
2377                .column(0)
2378                .as_any()
2379                .downcast_ref::<Decimal128Array>()
2380                .unwrap();
2381
2382            let expected = 1..25;
2383
2384            assert_eq!(col.precision(), target_precision);
2385            assert_eq!(col.scale(), 2);
2386
2387            for (i, v) in expected.enumerate() {
2388                assert_eq!(col.value(i), v * 100_i128);
2389            }
2390        }
2391    }
2392
2393    #[test]
2394    fn test_read_float16_nonzeros_file() {
2395        use arrow_array::Float16Array;
2396        let testdata = arrow::util::test_util::parquet_test_data();
2397        // see https://github.com/apache/parquet-testing/pull/40
2398        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2399        let file = File::open(path).unwrap();
2400        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2401
2402        let batch = record_reader.next().unwrap().unwrap();
2403        assert_eq!(batch.num_rows(), 8);
2404        let col = batch
2405            .column(0)
2406            .as_any()
2407            .downcast_ref::<Float16Array>()
2408            .unwrap();
2409
2410        let f16_two = f16::ONE + f16::ONE;
2411
2412        assert_eq!(col.null_count(), 1);
2413        assert!(col.is_null(0));
2414        assert_eq!(col.value(1), f16::ONE);
2415        assert_eq!(col.value(2), -f16_two);
2416        assert!(col.value(3).is_nan());
2417        assert_eq!(col.value(4), f16::ZERO);
2418        assert!(col.value(4).is_sign_positive());
2419        assert_eq!(col.value(5), f16::NEG_ONE);
2420        assert_eq!(col.value(6), f16::NEG_ZERO);
2421        assert!(col.value(6).is_sign_negative());
2422        assert_eq!(col.value(7), f16_two);
2423    }
2424
2425    #[test]
2426    fn test_read_float16_zeros_file() {
2427        use arrow_array::Float16Array;
2428        let testdata = arrow::util::test_util::parquet_test_data();
2429        // see https://github.com/apache/parquet-testing/pull/40
2430        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2431        let file = File::open(path).unwrap();
2432        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2433
2434        let batch = record_reader.next().unwrap().unwrap();
2435        assert_eq!(batch.num_rows(), 3);
2436        let col = batch
2437            .column(0)
2438            .as_any()
2439            .downcast_ref::<Float16Array>()
2440            .unwrap();
2441
2442        assert_eq!(col.null_count(), 1);
2443        assert!(col.is_null(0));
2444        assert_eq!(col.value(1), f16::ZERO);
2445        assert!(col.value(1).is_sign_positive());
2446        assert!(col.value(2).is_nan());
2447    }
2448
2449    #[test]
2450    fn test_read_float32_float64_byte_stream_split() {
2451        let path = format!(
2452            "{}/byte_stream_split.zstd.parquet",
2453            arrow::util::test_util::parquet_test_data(),
2454        );
2455        let file = File::open(path).unwrap();
2456        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2457
2458        let mut row_count = 0;
2459        for batch in record_reader {
2460            let batch = batch.unwrap();
2461            row_count += batch.num_rows();
2462            let f32_col = batch.column(0).as_primitive::<Float32Type>();
2463            let f64_col = batch.column(1).as_primitive::<Float64Type>();
2464
2465            // This file contains floats from a standard normal distribution
2466            for &x in f32_col.values() {
2467                assert!(x > -10.0);
2468                assert!(x < 10.0);
2469            }
2470            for &x in f64_col.values() {
2471                assert!(x > -10.0);
2472                assert!(x < 10.0);
2473            }
2474        }
2475        assert_eq!(row_count, 300);
2476    }
2477
2478    #[test]
2479    fn test_read_extended_byte_stream_split() {
2480        let path = format!(
2481            "{}/byte_stream_split_extended.gzip.parquet",
2482            arrow::util::test_util::parquet_test_data(),
2483        );
2484        let file = File::open(path).unwrap();
2485        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2486
2487        let mut row_count = 0;
2488        for batch in record_reader {
2489            let batch = batch.unwrap();
2490            row_count += batch.num_rows();
2491
2492            // 0,1 are f16
2493            let f16_col = batch.column(0).as_primitive::<Float16Type>();
2494            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2495            assert_eq!(f16_col.len(), f16_bss.len());
2496            f16_col
2497                .iter()
2498                .zip(f16_bss.iter())
2499                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2500
2501            // 2,3 are f32
2502            let f32_col = batch.column(2).as_primitive::<Float32Type>();
2503            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2504            assert_eq!(f32_col.len(), f32_bss.len());
2505            f32_col
2506                .iter()
2507                .zip(f32_bss.iter())
2508                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2509
2510            // 4,5 are f64
2511            let f64_col = batch.column(4).as_primitive::<Float64Type>();
2512            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2513            assert_eq!(f64_col.len(), f64_bss.len());
2514            f64_col
2515                .iter()
2516                .zip(f64_bss.iter())
2517                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2518
2519            // 6,7 are i32
2520            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2521            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2522            assert_eq!(i32_col.len(), i32_bss.len());
2523            i32_col
2524                .iter()
2525                .zip(i32_bss.iter())
2526                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2527
2528            // 8,9 are i64
2529            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2530            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2531            assert_eq!(i64_col.len(), i64_bss.len());
2532            i64_col
2533                .iter()
2534                .zip(i64_bss.iter())
2535                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2536
2537            // 10,11 are FLBA(5)
2538            let flba_col = batch.column(10).as_fixed_size_binary();
2539            let flba_bss = batch.column(11).as_fixed_size_binary();
2540            assert_eq!(flba_col.len(), flba_bss.len());
2541            flba_col
2542                .iter()
2543                .zip(flba_bss.iter())
2544                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2545
2546            // 12,13 are FLBA(4) (decimal(7,3))
2547            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2548            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2549            assert_eq!(dec_col.len(), dec_bss.len());
2550            dec_col
2551                .iter()
2552                .zip(dec_bss.iter())
2553                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2554        }
2555        assert_eq!(row_count, 200);
2556    }
2557
2558    #[test]
2559    fn test_read_incorrect_map_schema_file() {
2560        let testdata = arrow::util::test_util::parquet_test_data();
2561        // see https://github.com/apache/parquet-testing/pull/47
2562        let path = format!("{testdata}/incorrect_map_schema.parquet");
2563        let file = File::open(path).unwrap();
2564        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2565
2566        let batch = record_reader.next().unwrap().unwrap();
2567        assert_eq!(batch.num_rows(), 1);
2568
2569        let expected_schema = Schema::new(vec![Field::new(
2570            "my_map",
2571            ArrowDataType::Map(
2572                Arc::new(Field::new(
2573                    "key_value",
2574                    ArrowDataType::Struct(Fields::from(vec![
2575                        Field::new("key", ArrowDataType::Utf8, false),
2576                        Field::new("value", ArrowDataType::Utf8, true),
2577                    ])),
2578                    false,
2579                )),
2580                false,
2581            ),
2582            true,
2583        )]);
2584        assert_eq!(batch.schema().as_ref(), &expected_schema);
2585
2586        assert_eq!(batch.num_rows(), 1);
2587        assert_eq!(batch.column(0).null_count(), 0);
2588        assert_eq!(
2589            batch.column(0).as_map().keys().as_ref(),
2590            &StringArray::from(vec!["parent", "name"])
2591        );
2592        assert_eq!(
2593            batch.column(0).as_map().values().as_ref(),
2594            &StringArray::from(vec!["another", "report"])
2595        );
2596    }
2597
2598    #[test]
2599    fn test_read_dict_fixed_size_binary() {
2600        let schema = Arc::new(Schema::new(vec![Field::new(
2601            "a",
2602            ArrowDataType::Dictionary(
2603                Box::new(ArrowDataType::UInt8),
2604                Box::new(ArrowDataType::FixedSizeBinary(8)),
2605            ),
2606            true,
2607        )]));
2608        let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2609        let values = FixedSizeBinaryArray::try_from_iter(
2610            vec![
2611                (0u8..8u8).collect::<Vec<u8>>(),
2612                (24u8..32u8).collect::<Vec<u8>>(),
2613            ]
2614            .into_iter(),
2615        )
2616        .unwrap();
2617        let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2618        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2619
2620        let mut buffer = Vec::with_capacity(1024);
2621        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2622        writer.write(&batch).unwrap();
2623        writer.close().unwrap();
2624        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2625            .unwrap()
2626            .collect::<Result<Vec<_>, _>>()
2627            .unwrap();
2628
2629        assert_eq!(read.len(), 1);
2630        assert_eq!(&batch, &read[0])
2631    }
2632
2633    #[test]
2634    fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2635        // the `StructArrayReader` will check the definition and repetition levels of the first
2636        // child column in the struct to determine nullability for the struct. If the first
2637        // column's is being read by `ByteArrayDictionaryReader` we need to ensure that the
2638        // nullability is interpreted  correctly from the rep/def level buffers managed by the
2639        // buffers managed by this array reader.
2640
2641        let struct_fields = Fields::from(vec![
2642            Field::new(
2643                "city",
2644                ArrowDataType::Dictionary(
2645                    Box::new(ArrowDataType::UInt8),
2646                    Box::new(ArrowDataType::Utf8),
2647                ),
2648                true,
2649            ),
2650            Field::new("name", ArrowDataType::Utf8, true),
2651        ]);
2652        let schema = Arc::new(Schema::new(vec![Field::new(
2653            "items",
2654            ArrowDataType::Struct(struct_fields.clone()),
2655            true,
2656        )]));
2657
2658        let items_arr = StructArray::new(
2659            struct_fields,
2660            vec![
2661                Arc::new(DictionaryArray::new(
2662                    UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2663                    Arc::new(StringArray::from_iter_values(vec![
2664                        "quebec",
2665                        "fredericton",
2666                        "halifax",
2667                    ])),
2668                )),
2669                Arc::new(StringArray::from_iter_values(vec![
2670                    "albert", "terry", "lance", "", "tim",
2671                ])),
2672            ],
2673            Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2674        );
2675
2676        let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2677        let mut buffer = Vec::with_capacity(1024);
2678        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2679        writer.write(&batch).unwrap();
2680        writer.close().unwrap();
2681        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2682            .unwrap()
2683            .collect::<Result<Vec<_>, _>>()
2684            .unwrap();
2685
2686        assert_eq!(read.len(), 1);
2687        assert_eq!(&batch, &read[0])
2688    }
2689
2690    /// Parameters for single_column_reader_test
2691    #[derive(Clone)]
2692    struct TestOptions {
2693        /// Number of row group to write to parquet (row group size =
2694        /// num_row_groups / num_rows)
2695        num_row_groups: usize,
2696        /// Total number of rows per row group
2697        num_rows: usize,
2698        /// Size of batches to read back
2699        record_batch_size: usize,
2700        /// Percentage of nulls in column or None if required
2701        null_percent: Option<usize>,
2702        /// Set write batch size
2703        ///
2704        /// This is the number of rows that are written at once to a page and
2705        /// therefore acts as a bound on the page granularity of a row group
2706        write_batch_size: usize,
2707        /// Maximum size of page in bytes
2708        max_data_page_size: usize,
2709        /// Maximum size of dictionary page in bytes
2710        max_dict_page_size: usize,
2711        /// Writer version
2712        writer_version: WriterVersion,
2713        /// Enabled statistics
2714        enabled_statistics: EnabledStatistics,
2715        /// Encoding
2716        encoding: Encoding,
2717        /// row selections and total selected row count
2718        row_selections: Option<(RowSelection, usize)>,
2719        /// row filter
2720        row_filter: Option<Vec<bool>>,
2721        /// limit
2722        limit: Option<usize>,
2723        /// offset
2724        offset: Option<usize>,
2725    }
2726
2727    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
2728    impl std::fmt::Debug for TestOptions {
2729        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2730            f.debug_struct("TestOptions")
2731                .field("num_row_groups", &self.num_row_groups)
2732                .field("num_rows", &self.num_rows)
2733                .field("record_batch_size", &self.record_batch_size)
2734                .field("null_percent", &self.null_percent)
2735                .field("write_batch_size", &self.write_batch_size)
2736                .field("max_data_page_size", &self.max_data_page_size)
2737                .field("max_dict_page_size", &self.max_dict_page_size)
2738                .field("writer_version", &self.writer_version)
2739                .field("enabled_statistics", &self.enabled_statistics)
2740                .field("encoding", &self.encoding)
2741                .field("row_selections", &self.row_selections.is_some())
2742                .field("row_filter", &self.row_filter.is_some())
2743                .field("limit", &self.limit)
2744                .field("offset", &self.offset)
2745                .finish()
2746        }
2747    }
2748
2749    impl Default for TestOptions {
2750        fn default() -> Self {
2751            Self {
2752                num_row_groups: 2,
2753                num_rows: 100,
2754                record_batch_size: 15,
2755                null_percent: None,
2756                write_batch_size: 64,
2757                max_data_page_size: 1024 * 1024,
2758                max_dict_page_size: 1024 * 1024,
2759                writer_version: WriterVersion::PARQUET_1_0,
2760                enabled_statistics: EnabledStatistics::Page,
2761                encoding: Encoding::PLAIN,
2762                row_selections: None,
2763                row_filter: None,
2764                limit: None,
2765                offset: None,
2766            }
2767        }
2768    }
2769
2770    impl TestOptions {
2771        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2772            Self {
2773                num_row_groups,
2774                num_rows,
2775                record_batch_size,
2776                ..Default::default()
2777            }
2778        }
2779
2780        fn with_null_percent(self, null_percent: usize) -> Self {
2781            Self {
2782                null_percent: Some(null_percent),
2783                ..self
2784            }
2785        }
2786
2787        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2788            Self {
2789                max_data_page_size,
2790                ..self
2791            }
2792        }
2793
2794        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2795            Self {
2796                max_dict_page_size,
2797                ..self
2798            }
2799        }
2800
2801        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2802            Self {
2803                enabled_statistics,
2804                ..self
2805            }
2806        }
2807
2808        fn with_row_selections(self) -> Self {
2809            assert!(self.row_filter.is_none(), "Must set row selection first");
2810
2811            let mut rng = rng();
2812            let step = rng.random_range(self.record_batch_size..self.num_rows);
2813            let row_selections = create_test_selection(
2814                step,
2815                self.num_row_groups * self.num_rows,
2816                rng.random::<bool>(),
2817            );
2818            Self {
2819                row_selections: Some(row_selections),
2820                ..self
2821            }
2822        }
2823
2824        fn with_row_filter(self) -> Self {
2825            let row_count = match &self.row_selections {
2826                Some((_, count)) => *count,
2827                None => self.num_row_groups * self.num_rows,
2828            };
2829
2830            let mut rng = rng();
2831            Self {
2832                row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2833                ..self
2834            }
2835        }
2836
2837        fn with_limit(self, limit: usize) -> Self {
2838            Self {
2839                limit: Some(limit),
2840                ..self
2841            }
2842        }
2843
2844        fn with_offset(self, offset: usize) -> Self {
2845            Self {
2846                offset: Some(offset),
2847                ..self
2848            }
2849        }
2850
2851        fn writer_props(&self) -> WriterProperties {
2852            let builder = WriterProperties::builder()
2853                .set_data_page_size_limit(self.max_data_page_size)
2854                .set_write_batch_size(self.write_batch_size)
2855                .set_writer_version(self.writer_version)
2856                .set_statistics_enabled(self.enabled_statistics);
2857
2858            let builder = match self.encoding {
2859                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2860                    .set_dictionary_enabled(true)
2861                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2862                _ => builder
2863                    .set_dictionary_enabled(false)
2864                    .set_encoding(self.encoding),
2865            };
2866
2867            builder.build()
2868        }
2869    }
2870
2871    /// Create a parquet file and then read it using
2872    /// `ParquetFileArrowReader` using a standard set of parameters
2873    /// `opts`.
2874    ///
2875    /// `rand_max` represents the maximum size of value to pass to to
2876    /// value generator
2877    fn run_single_column_reader_tests<T, F, G>(
2878        rand_max: i32,
2879        converted_type: ConvertedType,
2880        arrow_type: Option<ArrowDataType>,
2881        converter: F,
2882        encodings: &[Encoding],
2883    ) where
2884        T: DataType,
2885        G: RandGen<T>,
2886        F: Fn(&[Option<T::T>]) -> ArrayRef,
2887    {
2888        let all_options = vec![
2889            // choose record_batch_batch (15) so batches cross row
2890            // group boundaries (50 rows in 2 row groups) cases.
2891            TestOptions::new(2, 100, 15),
2892            // choose record_batch_batch (5) so batches sometime fall
2893            // on row group boundaries and (25 rows in 3 row groups
2894            // --> row groups of 10, 10, and 5). Tests buffer
2895            // refilling edge cases.
2896            TestOptions::new(3, 25, 5),
2897            // Choose record_batch_size (25) so all batches fall
2898            // exactly on row group boundary (25). Tests buffer
2899            // refilling edge cases.
2900            TestOptions::new(4, 100, 25),
2901            // Set maximum page size so row groups have multiple pages
2902            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2903            // Set small dictionary page size to test dictionary fallback
2904            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2905            // Test optional but with no nulls
2906            TestOptions::new(2, 256, 127).with_null_percent(0),
2907            // Test optional with nulls
2908            TestOptions::new(2, 256, 93).with_null_percent(25),
2909            // Test with limit of 0
2910            TestOptions::new(4, 100, 25).with_limit(0),
2911            // Test with limit of 50
2912            TestOptions::new(4, 100, 25).with_limit(50),
2913            // Test with limit equal to number of rows
2914            TestOptions::new(4, 100, 25).with_limit(10),
2915            // Test with limit larger than number of rows
2916            TestOptions::new(4, 100, 25).with_limit(101),
2917            // Test with limit + offset equal to number of rows
2918            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2919            // Test with limit + offset equal to number of rows
2920            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2921            // Test with limit + offset larger than number of rows
2922            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2923            // Test with no page-level statistics
2924            TestOptions::new(2, 256, 91)
2925                .with_null_percent(25)
2926                .with_enabled_statistics(EnabledStatistics::Chunk),
2927            // Test with no statistics
2928            TestOptions::new(2, 256, 91)
2929                .with_null_percent(25)
2930                .with_enabled_statistics(EnabledStatistics::None),
2931            // Test with all null
2932            TestOptions::new(2, 128, 91)
2933                .with_null_percent(100)
2934                .with_enabled_statistics(EnabledStatistics::None),
2935            // Test skip
2936
2937            // choose record_batch_batch (15) so batches cross row
2938            // group boundaries (50 rows in 2 row groups) cases.
2939            TestOptions::new(2, 100, 15).with_row_selections(),
2940            // choose record_batch_batch (5) so batches sometime fall
2941            // on row group boundaries and (25 rows in 3 row groups
2942            // --> row groups of 10, 10, and 5). Tests buffer
2943            // refilling edge cases.
2944            TestOptions::new(3, 25, 5).with_row_selections(),
2945            // Choose record_batch_size (25) so all batches fall
2946            // exactly on row group boundary (25). Tests buffer
2947            // refilling edge cases.
2948            TestOptions::new(4, 100, 25).with_row_selections(),
2949            // Set maximum page size so row groups have multiple pages
2950            TestOptions::new(3, 256, 73)
2951                .with_max_data_page_size(128)
2952                .with_row_selections(),
2953            // Set small dictionary page size to test dictionary fallback
2954            TestOptions::new(3, 256, 57)
2955                .with_max_dict_page_size(128)
2956                .with_row_selections(),
2957            // Test optional but with no nulls
2958            TestOptions::new(2, 256, 127)
2959                .with_null_percent(0)
2960                .with_row_selections(),
2961            // Test optional with nulls
2962            TestOptions::new(2, 256, 93)
2963                .with_null_percent(25)
2964                .with_row_selections(),
2965            // Test optional with nulls
2966            TestOptions::new(2, 256, 93)
2967                .with_null_percent(25)
2968                .with_row_selections()
2969                .with_limit(10),
2970            // Test optional with nulls
2971            TestOptions::new(2, 256, 93)
2972                .with_null_percent(25)
2973                .with_row_selections()
2974                .with_offset(20)
2975                .with_limit(10),
2976            // Test filter
2977
2978            // Test with row filter
2979            TestOptions::new(4, 100, 25).with_row_filter(),
2980            // Test with row selection and row filter
2981            TestOptions::new(4, 100, 25)
2982                .with_row_selections()
2983                .with_row_filter(),
2984            // Test with nulls and row filter
2985            TestOptions::new(2, 256, 93)
2986                .with_null_percent(25)
2987                .with_max_data_page_size(10)
2988                .with_row_filter(),
2989            // Test with nulls and row filter and small pages
2990            TestOptions::new(2, 256, 93)
2991                .with_null_percent(25)
2992                .with_max_data_page_size(10)
2993                .with_row_selections()
2994                .with_row_filter(),
2995            // Test with row selection and no offset index and small pages
2996            TestOptions::new(2, 256, 93)
2997                .with_enabled_statistics(EnabledStatistics::None)
2998                .with_max_data_page_size(10)
2999                .with_row_selections(),
3000        ];
3001
3002        all_options.into_iter().for_each(|opts| {
3003            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3004                for encoding in encodings {
3005                    let opts = TestOptions {
3006                        writer_version,
3007                        encoding: *encoding,
3008                        ..opts.clone()
3009                    };
3010
3011                    single_column_reader_test::<T, _, G>(
3012                        opts,
3013                        rand_max,
3014                        converted_type,
3015                        arrow_type.clone(),
3016                        &converter,
3017                    )
3018                }
3019            }
3020        });
3021    }
3022
3023    /// Create a parquet file and then read it using
3024    /// `ParquetFileArrowReader` using the parameters described in
3025    /// `opts`.
3026    fn single_column_reader_test<T, F, G>(
3027        opts: TestOptions,
3028        rand_max: i32,
3029        converted_type: ConvertedType,
3030        arrow_type: Option<ArrowDataType>,
3031        converter: F,
3032    ) where
3033        T: DataType,
3034        G: RandGen<T>,
3035        F: Fn(&[Option<T::T>]) -> ArrayRef,
3036    {
3037        // Print out options to facilitate debugging failures on CI
3038        println!(
3039            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3040            T::get_physical_type(),
3041            converted_type,
3042            arrow_type,
3043            opts
3044        );
3045
3046        //according to null_percent generate def_levels
3047        let (repetition, def_levels) = match opts.null_percent.as_ref() {
3048            Some(null_percent) => {
3049                let mut rng = rng();
3050
3051                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3052                    .map(|_| {
3053                        std::iter::from_fn(|| {
3054                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3055                        })
3056                        .take(opts.num_rows)
3057                        .collect()
3058                    })
3059                    .collect();
3060                (Repetition::OPTIONAL, Some(def_levels))
3061            }
3062            None => (Repetition::REQUIRED, None),
3063        };
3064
3065        //generate random table data
3066        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3067            .map(|idx| {
3068                let null_count = match def_levels.as_ref() {
3069                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3070                    None => 0,
3071                };
3072                G::gen_vec(rand_max, opts.num_rows - null_count)
3073            })
3074            .collect();
3075
3076        let len = match T::get_physical_type() {
3077            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3078            crate::basic::Type::INT96 => 12,
3079            _ => -1,
3080        };
3081
3082        let fields = vec![Arc::new(
3083            Type::primitive_type_builder("leaf", T::get_physical_type())
3084                .with_repetition(repetition)
3085                .with_converted_type(converted_type)
3086                .with_length(len)
3087                .build()
3088                .unwrap(),
3089        )];
3090
3091        let schema = Arc::new(
3092            Type::group_type_builder("test_schema")
3093                .with_fields(fields)
3094                .build()
3095                .unwrap(),
3096        );
3097
3098        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3099
3100        let mut file = tempfile::tempfile().unwrap();
3101
3102        generate_single_column_file_with_data::<T>(
3103            &values,
3104            def_levels.as_ref(),
3105            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3106            schema,
3107            arrow_field,
3108            &opts,
3109        )
3110        .unwrap();
3111
3112        file.rewind().unwrap();
3113
3114        let options = ArrowReaderOptions::new()
3115            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
3116
3117        let mut builder =
3118            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3119
3120        let expected_data = match opts.row_selections {
3121            Some((selections, row_count)) => {
3122                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3123
3124                let mut skip_data: Vec<Option<T::T>> = vec![];
3125                let dequeue: VecDeque<RowSelector> = selections.clone().into();
3126                for select in dequeue {
3127                    if select.skip {
3128                        without_skip_data.drain(0..select.row_count);
3129                    } else {
3130                        skip_data.extend(without_skip_data.drain(0..select.row_count));
3131                    }
3132                }
3133                builder = builder.with_row_selection(selections);
3134
3135                assert_eq!(skip_data.len(), row_count);
3136                skip_data
3137            }
3138            None => {
3139                //get flatten table data
3140                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3141                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3142                expected_data
3143            }
3144        };
3145
3146        let mut expected_data = match opts.row_filter {
3147            Some(filter) => {
3148                let expected_data = expected_data
3149                    .into_iter()
3150                    .zip(filter.iter())
3151                    .filter_map(|(d, f)| f.then(|| d))
3152                    .collect();
3153
3154                let mut filter_offset = 0;
3155                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3156                    ProjectionMask::all(),
3157                    move |b| {
3158                        let array = BooleanArray::from_iter(
3159                            filter
3160                                .iter()
3161                                .skip(filter_offset)
3162                                .take(b.num_rows())
3163                                .map(|x| Some(*x)),
3164                        );
3165                        filter_offset += b.num_rows();
3166                        Ok(array)
3167                    },
3168                ))]);
3169
3170                builder = builder.with_row_filter(filter);
3171                expected_data
3172            }
3173            None => expected_data,
3174        };
3175
3176        if let Some(offset) = opts.offset {
3177            builder = builder.with_offset(offset);
3178            expected_data = expected_data.into_iter().skip(offset).collect();
3179        }
3180
3181        if let Some(limit) = opts.limit {
3182            builder = builder.with_limit(limit);
3183            expected_data = expected_data.into_iter().take(limit).collect();
3184        }
3185
3186        let mut record_reader = builder
3187            .with_batch_size(opts.record_batch_size)
3188            .build()
3189            .unwrap();
3190
3191        let mut total_read = 0;
3192        loop {
3193            let maybe_batch = record_reader.next();
3194            if total_read < expected_data.len() {
3195                let end = min(total_read + opts.record_batch_size, expected_data.len());
3196                let batch = maybe_batch.unwrap().unwrap();
3197                assert_eq!(end - total_read, batch.num_rows());
3198
3199                let a = converter(&expected_data[total_read..end]);
3200                let b = batch.column(0);
3201
3202                assert_eq!(a.data_type(), b.data_type());
3203                assert_eq!(a.to_data(), b.to_data());
3204                assert_eq!(
3205                    a.as_any().type_id(),
3206                    b.as_any().type_id(),
3207                    "incorrect type ids"
3208                );
3209
3210                total_read = end;
3211            } else {
3212                assert!(maybe_batch.is_none());
3213                break;
3214            }
3215        }
3216    }
3217
3218    fn gen_expected_data<T: DataType>(
3219        def_levels: Option<&Vec<Vec<i16>>>,
3220        values: &[Vec<T::T>],
3221    ) -> Vec<Option<T::T>> {
3222        let data: Vec<Option<T::T>> = match def_levels {
3223            Some(levels) => {
3224                let mut values_iter = values.iter().flatten();
3225                levels
3226                    .iter()
3227                    .flatten()
3228                    .map(|d| match d {
3229                        1 => Some(values_iter.next().cloned().unwrap()),
3230                        0 => None,
3231                        _ => unreachable!(),
3232                    })
3233                    .collect()
3234            }
3235            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3236        };
3237        data
3238    }
3239
3240    fn generate_single_column_file_with_data<T: DataType>(
3241        values: &[Vec<T::T>],
3242        def_levels: Option<&Vec<Vec<i16>>>,
3243        file: File,
3244        schema: TypePtr,
3245        field: Option<Field>,
3246        opts: &TestOptions,
3247    ) -> Result<ParquetMetaData> {
3248        let mut writer_props = opts.writer_props();
3249        if let Some(field) = field {
3250            let arrow_schema = Schema::new(vec![field]);
3251            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3252        }
3253
3254        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3255
3256        for (idx, v) in values.iter().enumerate() {
3257            let def_levels = def_levels.map(|d| d[idx].as_slice());
3258            let mut row_group_writer = writer.next_row_group()?;
3259            {
3260                let mut column_writer = row_group_writer
3261                    .next_column()?
3262                    .expect("Column writer is none!");
3263
3264                column_writer
3265                    .typed::<T>()
3266                    .write_batch(v, def_levels, None)?;
3267
3268                column_writer.close()?;
3269            }
3270            row_group_writer.close()?;
3271        }
3272
3273        writer.close()
3274    }
3275
3276    fn get_test_file(file_name: &str) -> File {
3277        let mut path = PathBuf::new();
3278        path.push(arrow::util::test_util::arrow_test_data());
3279        path.push(file_name);
3280
3281        File::open(path.as_path()).expect("File not found!")
3282    }
3283
3284    #[test]
3285    fn test_read_structs() {
3286        // This particular test file has columns of struct types where there is
3287        // a column that has the same name as one of the struct fields
3288        // (see: ARROW-11452)
3289        let testdata = arrow::util::test_util::parquet_test_data();
3290        let path = format!("{testdata}/nested_structs.rust.parquet");
3291        let file = File::open(&path).unwrap();
3292        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3293
3294        for batch in record_batch_reader {
3295            batch.unwrap();
3296        }
3297
3298        let file = File::open(&path).unwrap();
3299        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3300
3301        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3302        let projected_reader = builder
3303            .with_projection(mask)
3304            .with_batch_size(60)
3305            .build()
3306            .unwrap();
3307
3308        let expected_schema = Schema::new(vec![
3309            Field::new(
3310                "roll_num",
3311                ArrowDataType::Struct(Fields::from(vec![Field::new(
3312                    "count",
3313                    ArrowDataType::UInt64,
3314                    false,
3315                )])),
3316                false,
3317            ),
3318            Field::new(
3319                "PC_CUR",
3320                ArrowDataType::Struct(Fields::from(vec![
3321                    Field::new("mean", ArrowDataType::Int64, false),
3322                    Field::new("sum", ArrowDataType::Int64, false),
3323                ])),
3324                false,
3325            ),
3326        ]);
3327
3328        // Tests for #1652 and #1654
3329        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3330
3331        for batch in projected_reader {
3332            let batch = batch.unwrap();
3333            assert_eq!(batch.schema().as_ref(), &expected_schema);
3334        }
3335    }
3336
3337    #[test]
3338    // same as test_read_structs but constructs projection mask via column names
3339    fn test_read_structs_by_name() {
3340        let testdata = arrow::util::test_util::parquet_test_data();
3341        let path = format!("{testdata}/nested_structs.rust.parquet");
3342        let file = File::open(&path).unwrap();
3343        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3344
3345        for batch in record_batch_reader {
3346            batch.unwrap();
3347        }
3348
3349        let file = File::open(&path).unwrap();
3350        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3351
3352        let mask = ProjectionMask::columns(
3353            builder.parquet_schema(),
3354            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3355        );
3356        let projected_reader = builder
3357            .with_projection(mask)
3358            .with_batch_size(60)
3359            .build()
3360            .unwrap();
3361
3362        let expected_schema = Schema::new(vec![
3363            Field::new(
3364                "roll_num",
3365                ArrowDataType::Struct(Fields::from(vec![Field::new(
3366                    "count",
3367                    ArrowDataType::UInt64,
3368                    false,
3369                )])),
3370                false,
3371            ),
3372            Field::new(
3373                "PC_CUR",
3374                ArrowDataType::Struct(Fields::from(vec![
3375                    Field::new("mean", ArrowDataType::Int64, false),
3376                    Field::new("sum", ArrowDataType::Int64, false),
3377                ])),
3378                false,
3379            ),
3380        ]);
3381
3382        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3383
3384        for batch in projected_reader {
3385            let batch = batch.unwrap();
3386            assert_eq!(batch.schema().as_ref(), &expected_schema);
3387        }
3388    }
3389
3390    #[test]
3391    fn test_read_maps() {
3392        let testdata = arrow::util::test_util::parquet_test_data();
3393        let path = format!("{testdata}/nested_maps.snappy.parquet");
3394        let file = File::open(path).unwrap();
3395        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3396
3397        for batch in record_batch_reader {
3398            batch.unwrap();
3399        }
3400    }
3401
3402    #[test]
3403    fn test_nested_nullability() {
3404        let message_type = "message nested {
3405          OPTIONAL Group group {
3406            REQUIRED INT32 leaf;
3407          }
3408        }";
3409
3410        let file = tempfile::tempfile().unwrap();
3411        let schema = Arc::new(parse_message_type(message_type).unwrap());
3412
3413        {
3414            // Write using low-level parquet API (#1167)
3415            let mut writer =
3416                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3417                    .unwrap();
3418
3419            {
3420                let mut row_group_writer = writer.next_row_group().unwrap();
3421                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3422
3423                column_writer
3424                    .typed::<Int32Type>()
3425                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3426                    .unwrap();
3427
3428                column_writer.close().unwrap();
3429                row_group_writer.close().unwrap();
3430            }
3431
3432            writer.close().unwrap();
3433        }
3434
3435        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3436        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3437
3438        let reader = builder.with_projection(mask).build().unwrap();
3439
3440        let expected_schema = Schema::new(vec![Field::new(
3441            "group",
3442            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3443            true,
3444        )]);
3445
3446        let batch = reader.into_iter().next().unwrap().unwrap();
3447        assert_eq!(batch.schema().as_ref(), &expected_schema);
3448        assert_eq!(batch.num_rows(), 4);
3449        assert_eq!(batch.column(0).null_count(), 2);
3450    }
3451
3452    #[test]
3453    fn test_invalid_utf8() {
3454        // a parquet file with 1 column with invalid utf8
3455        let data = vec![
3456            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3457            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3458            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3459            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3460            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3461            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3462            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3463            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3464            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3465            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3466            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3467            82, 49,
3468        ];
3469
3470        let file = Bytes::from(data);
3471        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3472
3473        let error = record_batch_reader.next().unwrap().unwrap_err();
3474
3475        assert!(
3476            error.to_string().contains("invalid utf-8 sequence"),
3477            "{}",
3478            error
3479        );
3480    }
3481
3482    #[test]
3483    fn test_invalid_utf8_string_array() {
3484        test_invalid_utf8_string_array_inner::<i32>();
3485    }
3486
3487    #[test]
3488    fn test_invalid_utf8_large_string_array() {
3489        test_invalid_utf8_string_array_inner::<i64>();
3490    }
3491
3492    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3493        let cases = [
3494            invalid_utf8_first_char::<O>(),
3495            invalid_utf8_first_char_long_strings::<O>(),
3496            invalid_utf8_later_char::<O>(),
3497            invalid_utf8_later_char_long_strings::<O>(),
3498            invalid_utf8_later_char_really_long_strings::<O>(),
3499            invalid_utf8_later_char_really_long_strings2::<O>(),
3500        ];
3501        for array in &cases {
3502            for encoding in STRING_ENCODINGS {
3503                // data is not valid utf8 we can not construct a correct StringArray
3504                // safely, so purposely create an invalid StringArray
3505                let array = unsafe {
3506                    GenericStringArray::<O>::new_unchecked(
3507                        array.offsets().clone(),
3508                        array.values().clone(),
3509                        array.nulls().cloned(),
3510                    )
3511                };
3512                let data_type = array.data_type().clone();
3513                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3514                let err = read_from_parquet(data).unwrap_err();
3515                let expected_err =
3516                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3517                assert!(
3518                    err.to_string().contains(expected_err),
3519                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3520                );
3521            }
3522        }
3523    }
3524
3525    #[test]
3526    fn test_invalid_utf8_string_view_array() {
3527        let cases = [
3528            invalid_utf8_first_char::<i32>(),
3529            invalid_utf8_first_char_long_strings::<i32>(),
3530            invalid_utf8_later_char::<i32>(),
3531            invalid_utf8_later_char_long_strings::<i32>(),
3532            invalid_utf8_later_char_really_long_strings::<i32>(),
3533            invalid_utf8_later_char_really_long_strings2::<i32>(),
3534        ];
3535
3536        for encoding in STRING_ENCODINGS {
3537            for array in &cases {
3538                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3539                let array = array.as_binary_view();
3540
3541                // data is not valid utf8 we can not construct a correct StringArray
3542                // safely, so purposely create an invalid StringViewArray
3543                let array = unsafe {
3544                    StringViewArray::new_unchecked(
3545                        array.views().clone(),
3546                        array.data_buffers().to_vec(),
3547                        array.nulls().cloned(),
3548                    )
3549                };
3550
3551                let data_type = array.data_type().clone();
3552                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3553                let err = read_from_parquet(data).unwrap_err();
3554                let expected_err =
3555                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
3556                assert!(
3557                    err.to_string().contains(expected_err),
3558                    "data type: {data_type}, expected: {expected_err}, got: {err}"
3559                );
3560            }
3561        }
3562    }
3563
3564    /// Encodings suitable for string data
3565    const STRING_ENCODINGS: &[Option<Encoding>] = &[
3566        None,
3567        Some(Encoding::PLAIN),
3568        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3569        Some(Encoding::DELTA_BYTE_ARRAY),
3570    ];
3571
3572    /// Invalid Utf-8 sequence in the first character
3573    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3574    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3575
3576    /// Invalid Utf=8 sequence in NOT the first character
3577    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3578    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3579
3580    /// returns a BinaryArray with invalid UTF8 data in the first character
3581    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3582        let valid: &[u8] = b"   ";
3583        let invalid = INVALID_UTF8_FIRST_CHAR;
3584        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3585    }
3586
3587    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
3588    /// string larger than 12 bytes which is handled specially when reading
3589    /// `ByteViewArray`s
3590    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3591        let valid: &[u8] = b"   ";
3592        let mut invalid = vec![];
3593        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3594        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3595        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3596    }
3597
3598    /// returns a BinaryArray with invalid UTF8 data in a character other than
3599    /// the first (this is checked in a special codepath)
3600    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3601        let valid: &[u8] = b"   ";
3602        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3603        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3604    }
3605
3606    /// returns a BinaryArray with invalid UTF8 data in a character other than
3607    /// the first in a string larger than 12 bytes which is handled specially
3608    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3609    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3610        let valid: &[u8] = b"   ";
3611        let mut invalid = vec![];
3612        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3613        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3614        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3615    }
3616
3617    /// returns a BinaryArray with invalid UTF8 data in a character other than
3618    /// the first in a string larger than 128 bytes which is handled specially
3619    /// when reading `ByteViewArray`s (this is checked in a special codepath)
3620    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3621        let valid: &[u8] = b"   ";
3622        let mut invalid = vec![];
3623        for _ in 0..10 {
3624            // each instance is 38 bytes
3625            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3626        }
3627        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3628        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3629    }
3630
3631    /// returns a BinaryArray with small invalid UTF8 data followed by a large
3632    /// invalid UTF8 data in a character other than the first in a string larger
3633    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3634        let valid: &[u8] = b"   ";
3635        let mut valid_long = vec![];
3636        for _ in 0..10 {
3637            // each instance is 38 bytes
3638            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3639        }
3640        let invalid = INVALID_UTF8_LATER_CHAR;
3641        GenericBinaryArray::<O>::from_iter(vec![
3642            None,
3643            Some(valid),
3644            Some(invalid),
3645            None,
3646            Some(&valid_long),
3647            Some(valid),
3648        ])
3649    }
3650
3651    /// writes the array into a single column parquet file with the specified
3652    /// encoding.
3653    ///
3654    /// If no encoding is specified, use default (dictionary) encoding
3655    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3656        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3657        let mut data = vec![];
3658        let schema = batch.schema();
3659        let props = encoding.map(|encoding| {
3660            WriterProperties::builder()
3661                // must disable dictionary encoding to actually use encoding
3662                .set_dictionary_enabled(false)
3663                .set_encoding(encoding)
3664                .build()
3665        });
3666
3667        {
3668            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3669            writer.write(&batch).unwrap();
3670            writer.flush().unwrap();
3671            writer.close().unwrap();
3672        };
3673        data
3674    }
3675
3676    /// read the parquet file into a record batch
3677    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3678        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3679            .unwrap()
3680            .build()
3681            .unwrap();
3682
3683        reader.collect()
3684    }
3685
3686    #[test]
3687    fn test_dictionary_preservation() {
3688        let fields = vec![Arc::new(
3689            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3690                .with_repetition(Repetition::OPTIONAL)
3691                .with_converted_type(ConvertedType::UTF8)
3692                .build()
3693                .unwrap(),
3694        )];
3695
3696        let schema = Arc::new(
3697            Type::group_type_builder("test_schema")
3698                .with_fields(fields)
3699                .build()
3700                .unwrap(),
3701        );
3702
3703        let dict_type = ArrowDataType::Dictionary(
3704            Box::new(ArrowDataType::Int32),
3705            Box::new(ArrowDataType::Utf8),
3706        );
3707
3708        let arrow_field = Field::new("leaf", dict_type, true);
3709
3710        let mut file = tempfile::tempfile().unwrap();
3711
3712        let values = vec![
3713            vec![
3714                ByteArray::from("hello"),
3715                ByteArray::from("a"),
3716                ByteArray::from("b"),
3717                ByteArray::from("d"),
3718            ],
3719            vec![
3720                ByteArray::from("c"),
3721                ByteArray::from("a"),
3722                ByteArray::from("b"),
3723            ],
3724        ];
3725
3726        let def_levels = vec![
3727            vec![1, 0, 0, 1, 0, 0, 1, 1],
3728            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3729        ];
3730
3731        let opts = TestOptions {
3732            encoding: Encoding::RLE_DICTIONARY,
3733            ..Default::default()
3734        };
3735
3736        generate_single_column_file_with_data::<ByteArrayType>(
3737            &values,
3738            Some(&def_levels),
3739            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
3740            schema,
3741            Some(arrow_field),
3742            &opts,
3743        )
3744        .unwrap();
3745
3746        file.rewind().unwrap();
3747
3748        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3749
3750        let batches = record_reader
3751            .collect::<Result<Vec<RecordBatch>, _>>()
3752            .unwrap();
3753
3754        assert_eq!(batches.len(), 6);
3755        assert!(batches.iter().all(|x| x.num_columns() == 1));
3756
3757        let row_counts = batches
3758            .iter()
3759            .map(|x| (x.num_rows(), x.column(0).null_count()))
3760            .collect::<Vec<_>>();
3761
3762        assert_eq!(
3763            row_counts,
3764            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3765        );
3766
3767        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3768
3769        // First and second batch in same row group -> same dictionary
3770        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3771        // Third batch spans row group -> computed dictionary
3772        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3773        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3774        // Fourth, fifth and sixth from same row group -> same dictionary
3775        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3776        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3777    }
3778
3779    #[test]
3780    fn test_read_null_list() {
3781        let testdata = arrow::util::test_util::parquet_test_data();
3782        let path = format!("{testdata}/null_list.parquet");
3783        let file = File::open(path).unwrap();
3784        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3785
3786        let batch = record_batch_reader.next().unwrap().unwrap();
3787        assert_eq!(batch.num_rows(), 1);
3788        assert_eq!(batch.num_columns(), 1);
3789        assert_eq!(batch.column(0).len(), 1);
3790
3791        let list = batch
3792            .column(0)
3793            .as_any()
3794            .downcast_ref::<ListArray>()
3795            .unwrap();
3796        assert_eq!(list.len(), 1);
3797        assert!(list.is_valid(0));
3798
3799        let val = list.value(0);
3800        assert_eq!(val.len(), 0);
3801    }
3802
3803    #[test]
3804    fn test_null_schema_inference() {
3805        let testdata = arrow::util::test_util::parquet_test_data();
3806        let path = format!("{testdata}/null_list.parquet");
3807        let file = File::open(path).unwrap();
3808
3809        let arrow_field = Field::new(
3810            "emptylist",
3811            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3812            true,
3813        );
3814
3815        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3816        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3817        let schema = builder.schema();
3818        assert_eq!(schema.fields().len(), 1);
3819        assert_eq!(schema.field(0), &arrow_field);
3820    }
3821
3822    #[test]
3823    fn test_skip_metadata() {
3824        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3825        let field = Field::new("col", col.data_type().clone(), true);
3826
3827        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3828
3829        let metadata = [("key".to_string(), "value".to_string())]
3830            .into_iter()
3831            .collect();
3832
3833        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3834
3835        assert_ne!(schema_with_metadata, schema_without_metadata);
3836
3837        let batch =
3838            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3839
3840        let file = |version: WriterVersion| {
3841            let props = WriterProperties::builder()
3842                .set_writer_version(version)
3843                .build();
3844
3845            let file = tempfile().unwrap();
3846            let mut writer =
3847                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3848                    .unwrap();
3849            writer.write(&batch).unwrap();
3850            writer.close().unwrap();
3851            file
3852        };
3853
3854        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3855
3856        let v1_reader = file(WriterVersion::PARQUET_1_0);
3857        let v2_reader = file(WriterVersion::PARQUET_2_0);
3858
3859        let arrow_reader =
3860            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3861        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3862
3863        let reader =
3864            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3865                .unwrap()
3866                .build()
3867                .unwrap();
3868        assert_eq!(reader.schema(), schema_without_metadata);
3869
3870        let arrow_reader =
3871            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3872        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3873
3874        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3875            .unwrap()
3876            .build()
3877            .unwrap();
3878        assert_eq!(reader.schema(), schema_without_metadata);
3879    }
3880
3881    fn write_parquet_from_iter<I, F>(value: I) -> File
3882    where
3883        I: IntoIterator<Item = (F, ArrayRef)>,
3884        F: AsRef<str>,
3885    {
3886        let batch = RecordBatch::try_from_iter(value).unwrap();
3887        let file = tempfile().unwrap();
3888        let mut writer =
3889            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3890        writer.write(&batch).unwrap();
3891        writer.close().unwrap();
3892        file
3893    }
3894
3895    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3896    where
3897        I: IntoIterator<Item = (F, ArrayRef)>,
3898        F: AsRef<str>,
3899    {
3900        let file = write_parquet_from_iter(value);
3901        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3902        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3903            file.try_clone().unwrap(),
3904            options_with_schema,
3905        );
3906        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3907    }
3908
3909    #[test]
3910    fn test_schema_too_few_columns() {
3911        run_schema_test_with_error(
3912            vec![
3913                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3914                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3915            ],
3916            Arc::new(Schema::new(vec![Field::new(
3917                "int64",
3918                ArrowDataType::Int64,
3919                false,
3920            )])),
3921            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3922        );
3923    }
3924
3925    #[test]
3926    fn test_schema_too_many_columns() {
3927        run_schema_test_with_error(
3928            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3929            Arc::new(Schema::new(vec![
3930                Field::new("int64", ArrowDataType::Int64, false),
3931                Field::new("int32", ArrowDataType::Int32, false),
3932            ])),
3933            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3934        );
3935    }
3936
3937    #[test]
3938    fn test_schema_mismatched_column_names() {
3939        run_schema_test_with_error(
3940            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3941            Arc::new(Schema::new(vec![Field::new(
3942                "other",
3943                ArrowDataType::Int64,
3944                false,
3945            )])),
3946            "Arrow: incompatible arrow schema, expected field named int64 got other",
3947        );
3948    }
3949
3950    #[test]
3951    fn test_schema_incompatible_columns() {
3952        run_schema_test_with_error(
3953            vec![
3954                (
3955                    "col1_invalid",
3956                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3957                ),
3958                (
3959                    "col2_valid",
3960                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3961                ),
3962                (
3963                    "col3_invalid",
3964                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3965                ),
3966            ],
3967            Arc::new(Schema::new(vec![
3968                Field::new("col1_invalid", ArrowDataType::Int32, false),
3969                Field::new("col2_valid", ArrowDataType::Int32, false),
3970                Field::new("col3_invalid", ArrowDataType::Int32, false),
3971            ])),
3972            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3973        );
3974    }
3975
3976    #[test]
3977    fn test_one_incompatible_nested_column() {
3978        let nested_fields = Fields::from(vec![
3979            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3980            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3981        ]);
3982        let nested = StructArray::try_new(
3983            nested_fields,
3984            vec![
3985                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3986                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3987            ],
3988            None,
3989        )
3990        .expect("struct array");
3991        let supplied_nested_fields = Fields::from(vec![
3992            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3993            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3994        ]);
3995        run_schema_test_with_error(
3996            vec![
3997                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3998                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3999                ("nested", Arc::new(nested) as ArrayRef),
4000            ],
4001            Arc::new(Schema::new(vec![
4002                Field::new("col1", ArrowDataType::Int64, false),
4003                Field::new("col2", ArrowDataType::Int32, false),
4004                Field::new(
4005                    "nested",
4006                    ArrowDataType::Struct(supplied_nested_fields),
4007                    false,
4008                ),
4009            ])),
4010            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4011            requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4012            but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4013        );
4014    }
4015
4016    /// Return parquet data with a single column of utf8 strings
4017    fn utf8_parquet() -> Bytes {
4018        let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4019        let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4020        let props = None;
4021        // write parquet file with non nullable strings
4022        let mut parquet_data = vec![];
4023        let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4024        writer.write(&batch).unwrap();
4025        writer.close().unwrap();
4026        Bytes::from(parquet_data)
4027    }
4028
4029    #[test]
4030    fn test_schema_error_bad_types() {
4031        // verify incompatible schemas error on read
4032        let parquet_data = utf8_parquet();
4033
4034        // Ask to read it back with an incompatible schema (int vs string)
4035        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4036            "column1",
4037            arrow::datatypes::DataType::Int32,
4038            false,
4039        )]));
4040
4041        // read it back out
4042        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4043        let err =
4044            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4045                .unwrap_err();
4046        assert_eq!(
4047            err.to_string(),
4048            "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4049        )
4050    }
4051
4052    #[test]
4053    fn test_schema_error_bad_nullability() {
4054        // verify incompatible schemas error on read
4055        let parquet_data = utf8_parquet();
4056
4057        // Ask to read it back with an incompatible schema (nullability mismatch)
4058        let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4059            "column1",
4060            arrow::datatypes::DataType::Utf8,
4061            true,
4062        )]));
4063
4064        // read it back out
4065        let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4066        let err =
4067            ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4068                .unwrap_err();
4069        assert_eq!(
4070            err.to_string(),
4071            "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4072        )
4073    }
4074
4075    #[test]
4076    fn test_read_binary_as_utf8() {
4077        let file = write_parquet_from_iter(vec![
4078            (
4079                "binary_to_utf8",
4080                Arc::new(BinaryArray::from(vec![
4081                    b"one".as_ref(),
4082                    b"two".as_ref(),
4083                    b"three".as_ref(),
4084                ])) as ArrayRef,
4085            ),
4086            (
4087                "large_binary_to_large_utf8",
4088                Arc::new(LargeBinaryArray::from(vec![
4089                    b"one".as_ref(),
4090                    b"two".as_ref(),
4091                    b"three".as_ref(),
4092                ])) as ArrayRef,
4093            ),
4094            (
4095                "binary_view_to_utf8_view",
4096                Arc::new(BinaryViewArray::from(vec![
4097                    b"one".as_ref(),
4098                    b"two".as_ref(),
4099                    b"three".as_ref(),
4100                ])) as ArrayRef,
4101            ),
4102        ]);
4103        let supplied_fields = Fields::from(vec![
4104            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4105            Field::new(
4106                "large_binary_to_large_utf8",
4107                ArrowDataType::LargeUtf8,
4108                false,
4109            ),
4110            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4111        ]);
4112
4113        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4114        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4115            file.try_clone().unwrap(),
4116            options,
4117        )
4118        .expect("reader builder with schema")
4119        .build()
4120        .expect("reader with schema");
4121
4122        let batch = arrow_reader.next().unwrap().unwrap();
4123        assert_eq!(batch.num_columns(), 3);
4124        assert_eq!(batch.num_rows(), 3);
4125        assert_eq!(
4126            batch
4127                .column(0)
4128                .as_string::<i32>()
4129                .iter()
4130                .collect::<Vec<_>>(),
4131            vec![Some("one"), Some("two"), Some("three")]
4132        );
4133
4134        assert_eq!(
4135            batch
4136                .column(1)
4137                .as_string::<i64>()
4138                .iter()
4139                .collect::<Vec<_>>(),
4140            vec![Some("one"), Some("two"), Some("three")]
4141        );
4142
4143        assert_eq!(
4144            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4145            vec![Some("one"), Some("two"), Some("three")]
4146        );
4147    }
4148
4149    #[test]
4150    #[should_panic(expected = "Invalid UTF8 sequence at")]
4151    fn test_read_non_utf8_binary_as_utf8() {
4152        let file = write_parquet_from_iter(vec![(
4153            "non_utf8_binary",
4154            Arc::new(BinaryArray::from(vec![
4155                b"\xDE\x00\xFF".as_ref(),
4156                b"\xDE\x01\xAA".as_ref(),
4157                b"\xDE\x02\xFF".as_ref(),
4158            ])) as ArrayRef,
4159        )]);
4160        let supplied_fields = Fields::from(vec![Field::new(
4161            "non_utf8_binary",
4162            ArrowDataType::Utf8,
4163            false,
4164        )]);
4165
4166        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4167        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4168            file.try_clone().unwrap(),
4169            options,
4170        )
4171        .expect("reader builder with schema")
4172        .build()
4173        .expect("reader with schema");
4174        arrow_reader.next().unwrap().unwrap_err();
4175    }
4176
4177    #[test]
4178    fn test_with_schema() {
4179        let nested_fields = Fields::from(vec![
4180            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4181            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4182        ]);
4183
4184        let nested_arrays: Vec<ArrayRef> = vec![
4185            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4186            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4187        ];
4188
4189        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4190
4191        let file = write_parquet_from_iter(vec![
4192            (
4193                "int32_to_ts_second",
4194                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4195            ),
4196            (
4197                "date32_to_date64",
4198                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4199            ),
4200            ("nested", Arc::new(nested) as ArrayRef),
4201        ]);
4202
4203        let supplied_nested_fields = Fields::from(vec![
4204            Field::new(
4205                "utf8_to_dict",
4206                ArrowDataType::Dictionary(
4207                    Box::new(ArrowDataType::Int32),
4208                    Box::new(ArrowDataType::Utf8),
4209                ),
4210                false,
4211            ),
4212            Field::new(
4213                "int64_to_ts_nano",
4214                ArrowDataType::Timestamp(
4215                    arrow::datatypes::TimeUnit::Nanosecond,
4216                    Some("+10:00".into()),
4217                ),
4218                false,
4219            ),
4220        ]);
4221
4222        let supplied_schema = Arc::new(Schema::new(vec![
4223            Field::new(
4224                "int32_to_ts_second",
4225                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4226                false,
4227            ),
4228            Field::new("date32_to_date64", ArrowDataType::Date64, false),
4229            Field::new(
4230                "nested",
4231                ArrowDataType::Struct(supplied_nested_fields),
4232                false,
4233            ),
4234        ]));
4235
4236        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4237        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4238            file.try_clone().unwrap(),
4239            options,
4240        )
4241        .expect("reader builder with schema")
4242        .build()
4243        .expect("reader with schema");
4244
4245        assert_eq!(arrow_reader.schema(), supplied_schema);
4246        let batch = arrow_reader.next().unwrap().unwrap();
4247        assert_eq!(batch.num_columns(), 3);
4248        assert_eq!(batch.num_rows(), 4);
4249        assert_eq!(
4250            batch
4251                .column(0)
4252                .as_any()
4253                .downcast_ref::<TimestampSecondArray>()
4254                .expect("downcast to timestamp second")
4255                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4256                .map(|v| v.to_string())
4257                .expect("value as datetime"),
4258            "1970-01-01 01:00:00 +01:00"
4259        );
4260        assert_eq!(
4261            batch
4262                .column(1)
4263                .as_any()
4264                .downcast_ref::<Date64Array>()
4265                .expect("downcast to date64")
4266                .value_as_date(0)
4267                .map(|v| v.to_string())
4268                .expect("value as date"),
4269            "1970-01-01"
4270        );
4271
4272        let nested = batch
4273            .column(2)
4274            .as_any()
4275            .downcast_ref::<StructArray>()
4276            .expect("downcast to struct");
4277
4278        let nested_dict = nested
4279            .column(0)
4280            .as_any()
4281            .downcast_ref::<Int32DictionaryArray>()
4282            .expect("downcast to dictionary");
4283
4284        assert_eq!(
4285            nested_dict
4286                .values()
4287                .as_any()
4288                .downcast_ref::<StringArray>()
4289                .expect("downcast to string")
4290                .iter()
4291                .collect::<Vec<_>>(),
4292            vec![Some("a"), Some("b")]
4293        );
4294
4295        assert_eq!(
4296            nested_dict.keys().iter().collect::<Vec<_>>(),
4297            vec![Some(0), Some(0), Some(0), Some(1)]
4298        );
4299
4300        assert_eq!(
4301            nested
4302                .column(1)
4303                .as_any()
4304                .downcast_ref::<TimestampNanosecondArray>()
4305                .expect("downcast to timestamp nanosecond")
4306                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4307                .map(|v| v.to_string())
4308                .expect("value as datetime"),
4309            "1970-01-01 10:00:00.000000001 +10:00"
4310        );
4311    }
4312
4313    #[test]
4314    fn test_empty_projection() {
4315        let testdata = arrow::util::test_util::parquet_test_data();
4316        let path = format!("{testdata}/alltypes_plain.parquet");
4317        let file = File::open(path).unwrap();
4318
4319        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4320        let file_metadata = builder.metadata().file_metadata();
4321        let expected_rows = file_metadata.num_rows() as usize;
4322
4323        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4324        let batch_reader = builder
4325            .with_projection(mask)
4326            .with_batch_size(2)
4327            .build()
4328            .unwrap();
4329
4330        let mut total_rows = 0;
4331        for maybe_batch in batch_reader {
4332            let batch = maybe_batch.unwrap();
4333            total_rows += batch.num_rows();
4334            assert_eq!(batch.num_columns(), 0);
4335            assert!(batch.num_rows() <= 2);
4336        }
4337
4338        assert_eq!(total_rows, expected_rows);
4339    }
4340
4341    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4342        let schema = Arc::new(Schema::new(vec![Field::new(
4343            "list",
4344            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4345            true,
4346        )]));
4347
4348        let mut buf = Vec::with_capacity(1024);
4349
4350        let mut writer = ArrowWriter::try_new(
4351            &mut buf,
4352            schema.clone(),
4353            Some(
4354                WriterProperties::builder()
4355                    .set_max_row_group_size(row_group_size)
4356                    .build(),
4357            ),
4358        )
4359        .unwrap();
4360        for _ in 0..2 {
4361            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4362            for _ in 0..(batch_size) {
4363                list_builder.append(true);
4364            }
4365            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4366                .unwrap();
4367            writer.write(&batch).unwrap();
4368        }
4369        writer.close().unwrap();
4370
4371        let mut record_reader =
4372            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4373        assert_eq!(
4374            batch_size,
4375            record_reader.next().unwrap().unwrap().num_rows()
4376        );
4377        assert_eq!(
4378            batch_size,
4379            record_reader.next().unwrap().unwrap().num_rows()
4380        );
4381    }
4382
4383    #[test]
4384    fn test_row_group_exact_multiple() {
4385        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4386        test_row_group_batch(8, 8);
4387        test_row_group_batch(10, 8);
4388        test_row_group_batch(8, 10);
4389        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4390        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4391        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4392        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4393        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4394    }
4395
4396    /// Given a RecordBatch containing all the column data, return the expected batches given
4397    /// a `batch_size` and `selection`
4398    fn get_expected_batches(
4399        column: &RecordBatch,
4400        selection: &RowSelection,
4401        batch_size: usize,
4402    ) -> Vec<RecordBatch> {
4403        let mut expected_batches = vec![];
4404
4405        let mut selection: VecDeque<_> = selection.clone().into();
4406        let mut row_offset = 0;
4407        let mut last_start = None;
4408        while row_offset < column.num_rows() && !selection.is_empty() {
4409            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4410            while batch_remaining > 0 && !selection.is_empty() {
4411                let (to_read, skip) = match selection.front_mut() {
4412                    Some(selection) if selection.row_count > batch_remaining => {
4413                        selection.row_count -= batch_remaining;
4414                        (batch_remaining, selection.skip)
4415                    }
4416                    Some(_) => {
4417                        let select = selection.pop_front().unwrap();
4418                        (select.row_count, select.skip)
4419                    }
4420                    None => break,
4421                };
4422
4423                batch_remaining -= to_read;
4424
4425                match skip {
4426                    true => {
4427                        if let Some(last_start) = last_start.take() {
4428                            expected_batches.push(column.slice(last_start, row_offset - last_start))
4429                        }
4430                        row_offset += to_read
4431                    }
4432                    false => {
4433                        last_start.get_or_insert(row_offset);
4434                        row_offset += to_read
4435                    }
4436                }
4437            }
4438        }
4439
4440        if let Some(last_start) = last_start.take() {
4441            expected_batches.push(column.slice(last_start, row_offset - last_start))
4442        }
4443
4444        // Sanity check, all batches except the final should be the batch size
4445        for batch in &expected_batches[..expected_batches.len() - 1] {
4446            assert_eq!(batch.num_rows(), batch_size);
4447        }
4448
4449        expected_batches
4450    }
4451
4452    fn create_test_selection(
4453        step_len: usize,
4454        total_len: usize,
4455        skip_first: bool,
4456    ) -> (RowSelection, usize) {
4457        let mut remaining = total_len;
4458        let mut skip = skip_first;
4459        let mut vec = vec![];
4460        let mut selected_count = 0;
4461        while remaining != 0 {
4462            let step = if remaining > step_len {
4463                step_len
4464            } else {
4465                remaining
4466            };
4467            vec.push(RowSelector {
4468                row_count: step,
4469                skip,
4470            });
4471            remaining -= step;
4472            if !skip {
4473                selected_count += step;
4474            }
4475            skip = !skip;
4476        }
4477        (vec.into(), selected_count)
4478    }
4479
4480    #[test]
4481    fn test_scan_row_with_selection() {
4482        let testdata = arrow::util::test_util::parquet_test_data();
4483        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4484        let test_file = File::open(&path).unwrap();
4485
4486        let mut serial_reader =
4487            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4488        let data = serial_reader.next().unwrap().unwrap();
4489
4490        let do_test = |batch_size: usize, selection_len: usize| {
4491            for skip_first in [false, true] {
4492                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4493
4494                let expected = get_expected_batches(&data, &selections, batch_size);
4495                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4496                assert_eq!(
4497                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4498                    expected,
4499                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4500                );
4501            }
4502        };
4503
4504        // total row count 7300
4505        // 1. test selection len more than one page row count
4506        do_test(1000, 1000);
4507
4508        // 2. test selection len less than one page row count
4509        do_test(20, 20);
4510
4511        // 3. test selection_len less than batch_size
4512        do_test(20, 5);
4513
4514        // 4. test selection_len more than batch_size
4515        // If batch_size < selection_len
4516        do_test(20, 5);
4517
4518        fn create_skip_reader(
4519            test_file: &File,
4520            batch_size: usize,
4521            selections: RowSelection,
4522        ) -> ParquetRecordBatchReader {
4523            let options = ArrowReaderOptions::new().with_page_index(true);
4524            let file = test_file.try_clone().unwrap();
4525            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4526                .unwrap()
4527                .with_batch_size(batch_size)
4528                .with_row_selection(selections)
4529                .build()
4530                .unwrap()
4531        }
4532    }
4533
4534    #[test]
4535    fn test_batch_size_overallocate() {
4536        let testdata = arrow::util::test_util::parquet_test_data();
4537        // `alltypes_plain.parquet` only have 8 rows
4538        let path = format!("{testdata}/alltypes_plain.parquet");
4539        let test_file = File::open(path).unwrap();
4540
4541        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4542        let num_rows = builder.metadata.file_metadata().num_rows();
4543        let reader = builder
4544            .with_batch_size(1024)
4545            .with_projection(ProjectionMask::all())
4546            .build()
4547            .unwrap();
4548        assert_ne!(1024, num_rows);
4549        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4550    }
4551
4552    #[test]
4553    fn test_read_with_page_index_enabled() {
4554        let testdata = arrow::util::test_util::parquet_test_data();
4555
4556        {
4557            // `alltypes_tiny_pages.parquet` has page index
4558            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4559            let test_file = File::open(path).unwrap();
4560            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4561                test_file,
4562                ArrowReaderOptions::new().with_page_index(true),
4563            )
4564            .unwrap();
4565            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4566            let reader = builder.build().unwrap();
4567            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4568            assert_eq!(batches.len(), 8);
4569        }
4570
4571        {
4572            // `alltypes_plain.parquet` doesn't have page index
4573            let path = format!("{testdata}/alltypes_plain.parquet");
4574            let test_file = File::open(path).unwrap();
4575            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4576                test_file,
4577                ArrowReaderOptions::new().with_page_index(true),
4578            )
4579            .unwrap();
4580            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
4581            // we should read the file successfully.
4582            assert!(builder.metadata().offset_index().is_none());
4583            let reader = builder.build().unwrap();
4584            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4585            assert_eq!(batches.len(), 1);
4586        }
4587    }
4588
4589    #[test]
4590    fn test_raw_repetition() {
4591        const MESSAGE_TYPE: &str = "
4592            message Log {
4593              OPTIONAL INT32 eventType;
4594              REPEATED INT32 category;
4595              REPEATED group filter {
4596                OPTIONAL INT32 error;
4597              }
4598            }
4599        ";
4600        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4601        let props = Default::default();
4602
4603        let mut buf = Vec::with_capacity(1024);
4604        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4605        let mut row_group_writer = writer.next_row_group().unwrap();
4606
4607        // column 0
4608        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4609        col_writer
4610            .typed::<Int32Type>()
4611            .write_batch(&[1], Some(&[1]), None)
4612            .unwrap();
4613        col_writer.close().unwrap();
4614        // column 1
4615        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4616        col_writer
4617            .typed::<Int32Type>()
4618            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4619            .unwrap();
4620        col_writer.close().unwrap();
4621        // column 2
4622        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4623        col_writer
4624            .typed::<Int32Type>()
4625            .write_batch(&[1], Some(&[1]), Some(&[0]))
4626            .unwrap();
4627        col_writer.close().unwrap();
4628
4629        let rg_md = row_group_writer.close().unwrap();
4630        assert_eq!(rg_md.num_rows(), 1);
4631        writer.close().unwrap();
4632
4633        let bytes = Bytes::from(buf);
4634
4635        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4636        let full = no_mask.next().unwrap().unwrap();
4637
4638        assert_eq!(full.num_columns(), 3);
4639
4640        for idx in 0..3 {
4641            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4642            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4643            let mut reader = b.with_projection(mask).build().unwrap();
4644            let projected = reader.next().unwrap().unwrap();
4645
4646            assert_eq!(projected.num_columns(), 1);
4647            assert_eq!(full.column(idx), projected.column(0));
4648        }
4649    }
4650
4651    #[test]
4652    fn test_read_lz4_raw() {
4653        let testdata = arrow::util::test_util::parquet_test_data();
4654        let path = format!("{testdata}/lz4_raw_compressed.parquet");
4655        let file = File::open(path).unwrap();
4656
4657        let batches = ParquetRecordBatchReader::try_new(file, 1024)
4658            .unwrap()
4659            .collect::<Result<Vec<_>, _>>()
4660            .unwrap();
4661        assert_eq!(batches.len(), 1);
4662        let batch = &batches[0];
4663
4664        assert_eq!(batch.num_columns(), 3);
4665        assert_eq!(batch.num_rows(), 4);
4666
4667        // https://github.com/apache/parquet-testing/pull/18
4668        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4669        assert_eq!(
4670            a.values(),
4671            &[1593604800, 1593604800, 1593604801, 1593604801]
4672        );
4673
4674        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4675        let a: Vec<_> = a.iter().flatten().collect();
4676        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4677
4678        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4679        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4680    }
4681
4682    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
4683    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
4684    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4685    //    LZ4_HADOOP algorithm for compression.
4686    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
4687    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
4688    //    older parquet-cpp versions.
4689    //
4690    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
4691    #[test]
4692    fn test_read_lz4_hadoop_fallback() {
4693        for file in [
4694            "hadoop_lz4_compressed.parquet",
4695            "non_hadoop_lz4_compressed.parquet",
4696        ] {
4697            let testdata = arrow::util::test_util::parquet_test_data();
4698            let path = format!("{testdata}/{file}");
4699            let file = File::open(path).unwrap();
4700            let expected_rows = 4;
4701
4702            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4703                .unwrap()
4704                .collect::<Result<Vec<_>, _>>()
4705                .unwrap();
4706            assert_eq!(batches.len(), 1);
4707            let batch = &batches[0];
4708
4709            assert_eq!(batch.num_columns(), 3);
4710            assert_eq!(batch.num_rows(), expected_rows);
4711
4712            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4713            assert_eq!(
4714                a.values(),
4715                &[1593604800, 1593604800, 1593604801, 1593604801]
4716            );
4717
4718            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4719            let b: Vec<_> = b.iter().flatten().collect();
4720            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4721
4722            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4723            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4724        }
4725    }
4726
4727    #[test]
4728    fn test_read_lz4_hadoop_large() {
4729        let testdata = arrow::util::test_util::parquet_test_data();
4730        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4731        let file = File::open(path).unwrap();
4732        let expected_rows = 10000;
4733
4734        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4735            .unwrap()
4736            .collect::<Result<Vec<_>, _>>()
4737            .unwrap();
4738        assert_eq!(batches.len(), 1);
4739        let batch = &batches[0];
4740
4741        assert_eq!(batch.num_columns(), 1);
4742        assert_eq!(batch.num_rows(), expected_rows);
4743
4744        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4745        let a: Vec<_> = a.iter().flatten().collect();
4746        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4747        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4748        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4749        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4750    }
4751
4752    #[test]
4753    #[cfg(feature = "snap")]
4754    fn test_read_nested_lists() {
4755        let testdata = arrow::util::test_util::parquet_test_data();
4756        let path = format!("{testdata}/nested_lists.snappy.parquet");
4757        let file = File::open(path).unwrap();
4758
4759        let f = file.try_clone().unwrap();
4760        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4761        let expected = reader.next().unwrap().unwrap();
4762        assert_eq!(expected.num_rows(), 3);
4763
4764        let selection = RowSelection::from(vec![
4765            RowSelector::skip(1),
4766            RowSelector::select(1),
4767            RowSelector::skip(1),
4768        ]);
4769        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4770            .unwrap()
4771            .with_row_selection(selection)
4772            .build()
4773            .unwrap();
4774
4775        let actual = reader.next().unwrap().unwrap();
4776        assert_eq!(actual.num_rows(), 1);
4777        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4778    }
4779
4780    #[test]
4781    fn test_arbitrary_decimal() {
4782        let values = [1, 2, 3, 4, 5, 6, 7, 8];
4783        let decimals_19_0 = Decimal128Array::from_iter_values(values)
4784            .with_precision_and_scale(19, 0)
4785            .unwrap();
4786        let decimals_12_0 = Decimal128Array::from_iter_values(values)
4787            .with_precision_and_scale(12, 0)
4788            .unwrap();
4789        let decimals_17_10 = Decimal128Array::from_iter_values(values)
4790            .with_precision_and_scale(17, 10)
4791            .unwrap();
4792
4793        let written = RecordBatch::try_from_iter([
4794            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4795            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4796            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4797        ])
4798        .unwrap();
4799
4800        let mut buffer = Vec::with_capacity(1024);
4801        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4802        writer.write(&written).unwrap();
4803        writer.close().unwrap();
4804
4805        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4806            .unwrap()
4807            .collect::<Result<Vec<_>, _>>()
4808            .unwrap();
4809
4810        assert_eq!(&written.slice(0, 8), &read[0]);
4811    }
4812
4813    #[test]
4814    fn test_list_skip() {
4815        let mut list = ListBuilder::new(Int32Builder::new());
4816        list.append_value([Some(1), Some(2)]);
4817        list.append_value([Some(3)]);
4818        list.append_value([Some(4)]);
4819        let list = list.finish();
4820        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4821
4822        // First page contains 2 values but only 1 row
4823        let props = WriterProperties::builder()
4824            .set_data_page_row_count_limit(1)
4825            .set_write_batch_size(2)
4826            .build();
4827
4828        let mut buffer = Vec::with_capacity(1024);
4829        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4830        writer.write(&batch).unwrap();
4831        writer.close().unwrap();
4832
4833        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4834        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4835            .unwrap()
4836            .with_row_selection(selection.into())
4837            .build()
4838            .unwrap();
4839        let out = reader.next().unwrap().unwrap();
4840        assert_eq!(out.num_rows(), 1);
4841        assert_eq!(out, batch.slice(2, 1));
4842    }
4843
4844    #[test]
4845    fn test_row_selection_interleaved_skip() -> Result<()> {
4846        let schema = Arc::new(Schema::new(vec![Field::new(
4847            "v",
4848            ArrowDataType::Int32,
4849            false,
4850        )]));
4851
4852        let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
4853        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
4854
4855        let mut buffer = Vec::with_capacity(1024);
4856        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
4857        writer.write(&batch)?;
4858        writer.close()?;
4859
4860        let selection = RowSelection::from(vec![
4861            RowSelector::select(1),
4862            RowSelector::skip(2),
4863            RowSelector::select(2),
4864        ]);
4865
4866        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
4867            .with_batch_size(4)
4868            .with_row_selection(selection)
4869            .build()?;
4870
4871        let out = reader.next().unwrap()?;
4872        assert_eq!(out.num_rows(), 3);
4873        let values = out
4874            .column(0)
4875            .as_primitive::<arrow_array::types::Int32Type>()
4876            .values();
4877        assert_eq!(values, &[0, 3, 4]);
4878        assert!(reader.next().is_none());
4879        Ok(())
4880    }
4881
4882    #[test]
4883    fn test_row_selection_mask_sparse_rows() -> Result<()> {
4884        let schema = Arc::new(Schema::new(vec![Field::new(
4885            "v",
4886            ArrowDataType::Int32,
4887            false,
4888        )]));
4889
4890        let values = Int32Array::from((0..30).collect::<Vec<i32>>());
4891        let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
4892
4893        let mut buffer = Vec::with_capacity(1024);
4894        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
4895        writer.write(&batch)?;
4896        writer.close()?;
4897
4898        let total_rows = batch.num_rows();
4899        let ranges = (1..total_rows)
4900            .step_by(2)
4901            .map(|i| i..i + 1)
4902            .collect::<Vec<_>>();
4903        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
4904
4905        let selectors: Vec<RowSelector> = selection.clone().into();
4906        assert!(total_rows < selectors.len() * 8);
4907
4908        let bytes = Bytes::from(buffer);
4909
4910        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
4911            .with_batch_size(7)
4912            .with_row_selection(selection)
4913            .build()?;
4914
4915        let mut collected = Vec::new();
4916        for batch in reader {
4917            let batch = batch?;
4918            collected.extend_from_slice(
4919                batch
4920                    .column(0)
4921                    .as_primitive::<arrow_array::types::Int32Type>()
4922                    .values(),
4923            );
4924        }
4925
4926        let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
4927        assert_eq!(collected, expected);
4928        Ok(())
4929    }
4930
4931    fn test_decimal32_roundtrip() {
4932        let d = |values: Vec<i32>, p: u8| {
4933            let iter = values.into_iter();
4934            PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4935                .with_precision_and_scale(p, 2)
4936                .unwrap()
4937        };
4938
4939        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4940        let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4941
4942        let mut buffer = Vec::with_capacity(1024);
4943        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4944        writer.write(&batch).unwrap();
4945        writer.close().unwrap();
4946
4947        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4948        let t1 = builder.parquet_schema().columns()[0].physical_type();
4949        assert_eq!(t1, PhysicalType::INT32);
4950
4951        let mut reader = builder.build().unwrap();
4952        assert_eq!(batch.schema(), reader.schema());
4953
4954        let out = reader.next().unwrap().unwrap();
4955        assert_eq!(batch, out);
4956    }
4957
4958    fn test_decimal64_roundtrip() {
4959        // Precision <= 9 -> INT32
4960        // Precision <= 18 -> INT64
4961
4962        let d = |values: Vec<i64>, p: u8| {
4963            let iter = values.into_iter();
4964            PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4965                .with_precision_and_scale(p, 2)
4966                .unwrap()
4967        };
4968
4969        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4970        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4971        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4972
4973        let batch = RecordBatch::try_from_iter([
4974            ("d1", Arc::new(d1) as ArrayRef),
4975            ("d2", Arc::new(d2) as ArrayRef),
4976            ("d3", Arc::new(d3) as ArrayRef),
4977        ])
4978        .unwrap();
4979
4980        let mut buffer = Vec::with_capacity(1024);
4981        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4982        writer.write(&batch).unwrap();
4983        writer.close().unwrap();
4984
4985        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4986        let t1 = builder.parquet_schema().columns()[0].physical_type();
4987        assert_eq!(t1, PhysicalType::INT32);
4988        let t2 = builder.parquet_schema().columns()[1].physical_type();
4989        assert_eq!(t2, PhysicalType::INT64);
4990        let t3 = builder.parquet_schema().columns()[2].physical_type();
4991        assert_eq!(t3, PhysicalType::INT64);
4992
4993        let mut reader = builder.build().unwrap();
4994        assert_eq!(batch.schema(), reader.schema());
4995
4996        let out = reader.next().unwrap().unwrap();
4997        assert_eq!(batch, out);
4998    }
4999
5000    fn test_decimal_roundtrip<T: DecimalType>() {
5001        // Precision <= 9 -> INT32
5002        // Precision <= 18 -> INT64
5003        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
5004
5005        let d = |values: Vec<usize>, p: u8| {
5006            let iter = values.into_iter().map(T::Native::usize_as);
5007            PrimitiveArray::<T>::from_iter_values(iter)
5008                .with_precision_and_scale(p, 2)
5009                .unwrap()
5010        };
5011
5012        let d1 = d(vec![1, 2, 3, 4, 5], 9);
5013        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5014        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5015        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5016
5017        let batch = RecordBatch::try_from_iter([
5018            ("d1", Arc::new(d1) as ArrayRef),
5019            ("d2", Arc::new(d2) as ArrayRef),
5020            ("d3", Arc::new(d3) as ArrayRef),
5021            ("d4", Arc::new(d4) as ArrayRef),
5022        ])
5023        .unwrap();
5024
5025        let mut buffer = Vec::with_capacity(1024);
5026        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5027        writer.write(&batch).unwrap();
5028        writer.close().unwrap();
5029
5030        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5031        let t1 = builder.parquet_schema().columns()[0].physical_type();
5032        assert_eq!(t1, PhysicalType::INT32);
5033        let t2 = builder.parquet_schema().columns()[1].physical_type();
5034        assert_eq!(t2, PhysicalType::INT64);
5035        let t3 = builder.parquet_schema().columns()[2].physical_type();
5036        assert_eq!(t3, PhysicalType::INT64);
5037        let t4 = builder.parquet_schema().columns()[3].physical_type();
5038        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5039
5040        let mut reader = builder.build().unwrap();
5041        assert_eq!(batch.schema(), reader.schema());
5042
5043        let out = reader.next().unwrap().unwrap();
5044        assert_eq!(batch, out);
5045    }
5046
5047    #[test]
5048    fn test_decimal() {
5049        test_decimal32_roundtrip();
5050        test_decimal64_roundtrip();
5051        test_decimal_roundtrip::<Decimal128Type>();
5052        test_decimal_roundtrip::<Decimal256Type>();
5053    }
5054
5055    #[test]
5056    fn test_list_selection() {
5057        let schema = Arc::new(Schema::new(vec![Field::new_list(
5058            "list",
5059            Field::new_list_field(ArrowDataType::Utf8, true),
5060            false,
5061        )]));
5062        let mut buf = Vec::with_capacity(1024);
5063
5064        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5065
5066        for i in 0..2 {
5067            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5068            for j in 0..1024 {
5069                list_a_builder.values().append_value(format!("{i} {j}"));
5070                list_a_builder.append(true);
5071            }
5072            let batch =
5073                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5074                    .unwrap();
5075            writer.write(&batch).unwrap();
5076        }
5077        let _metadata = writer.close().unwrap();
5078
5079        let buf = Bytes::from(buf);
5080        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5081            .unwrap()
5082            .with_row_selection(RowSelection::from(vec![
5083                RowSelector::skip(100),
5084                RowSelector::select(924),
5085                RowSelector::skip(100),
5086                RowSelector::select(924),
5087            ]))
5088            .build()
5089            .unwrap();
5090
5091        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5092        let batch = concat_batches(&schema, &batches).unwrap();
5093
5094        assert_eq!(batch.num_rows(), 924 * 2);
5095        let list = batch.column(0).as_list::<i32>();
5096
5097        for w in list.value_offsets().windows(2) {
5098            assert_eq!(w[0] + 1, w[1])
5099        }
5100        let mut values = list.values().as_string::<i32>().iter();
5101
5102        for i in 0..2 {
5103            for j in 100..1024 {
5104                let expected = format!("{i} {j}");
5105                assert_eq!(values.next().unwrap().unwrap(), &expected);
5106            }
5107        }
5108    }
5109
5110    #[test]
5111    fn test_list_selection_fuzz() {
5112        let mut rng = rng();
5113        let schema = Arc::new(Schema::new(vec![Field::new_list(
5114            "list",
5115            Field::new_list(
5116                Field::LIST_FIELD_DEFAULT_NAME,
5117                Field::new_list_field(ArrowDataType::Int32, true),
5118                true,
5119            ),
5120            true,
5121        )]));
5122        let mut buf = Vec::with_capacity(1024);
5123        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5124
5125        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5126
5127        for _ in 0..2048 {
5128            if rng.random_bool(0.2) {
5129                list_a_builder.append(false);
5130                continue;
5131            }
5132
5133            let list_a_len = rng.random_range(0..10);
5134            let list_b_builder = list_a_builder.values();
5135
5136            for _ in 0..list_a_len {
5137                if rng.random_bool(0.2) {
5138                    list_b_builder.append(false);
5139                    continue;
5140                }
5141
5142                let list_b_len = rng.random_range(0..10);
5143                let int_builder = list_b_builder.values();
5144                for _ in 0..list_b_len {
5145                    match rng.random_bool(0.2) {
5146                        true => int_builder.append_null(),
5147                        false => int_builder.append_value(rng.random()),
5148                    }
5149                }
5150                list_b_builder.append(true)
5151            }
5152            list_a_builder.append(true);
5153        }
5154
5155        let array = Arc::new(list_a_builder.finish());
5156        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5157
5158        writer.write(&batch).unwrap();
5159        let _metadata = writer.close().unwrap();
5160
5161        let buf = Bytes::from(buf);
5162
5163        let cases = [
5164            vec![
5165                RowSelector::skip(100),
5166                RowSelector::select(924),
5167                RowSelector::skip(100),
5168                RowSelector::select(924),
5169            ],
5170            vec![
5171                RowSelector::select(924),
5172                RowSelector::skip(100),
5173                RowSelector::select(924),
5174                RowSelector::skip(100),
5175            ],
5176            vec![
5177                RowSelector::skip(1023),
5178                RowSelector::select(1),
5179                RowSelector::skip(1023),
5180                RowSelector::select(1),
5181            ],
5182            vec![
5183                RowSelector::select(1),
5184                RowSelector::skip(1023),
5185                RowSelector::select(1),
5186                RowSelector::skip(1023),
5187            ],
5188        ];
5189
5190        for batch_size in [100, 1024, 2048] {
5191            for selection in &cases {
5192                let selection = RowSelection::from(selection.clone());
5193                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5194                    .unwrap()
5195                    .with_row_selection(selection.clone())
5196                    .with_batch_size(batch_size)
5197                    .build()
5198                    .unwrap();
5199
5200                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5201                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5202                assert_eq!(actual.num_rows(), selection.row_count());
5203
5204                let mut batch_offset = 0;
5205                let mut actual_offset = 0;
5206                for selector in selection.iter() {
5207                    if selector.skip {
5208                        batch_offset += selector.row_count;
5209                        continue;
5210                    }
5211
5212                    assert_eq!(
5213                        batch.slice(batch_offset, selector.row_count),
5214                        actual.slice(actual_offset, selector.row_count)
5215                    );
5216
5217                    batch_offset += selector.row_count;
5218                    actual_offset += selector.row_count;
5219                }
5220            }
5221        }
5222    }
5223
5224    #[test]
5225    fn test_read_old_nested_list() {
5226        use arrow::datatypes::DataType;
5227        use arrow::datatypes::ToByteSlice;
5228
5229        let testdata = arrow::util::test_util::parquet_test_data();
5230        // message my_record {
5231        //     REQUIRED group a (LIST) {
5232        //         REPEATED group array (LIST) {
5233        //             REPEATED INT32 array;
5234        //         }
5235        //     }
5236        // }
5237        // should be read as list<list<int32>>
5238        let path = format!("{testdata}/old_list_structure.parquet");
5239        let test_file = File::open(path).unwrap();
5240
5241        // create expected ListArray
5242        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5243
5244        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
5245        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5246
5247        // Construct a list array from the above two
5248        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5249            "array",
5250            DataType::Int32,
5251            false,
5252        ))))
5253        .len(2)
5254        .add_buffer(a_value_offsets)
5255        .add_child_data(a_values.into_data())
5256        .build()
5257        .unwrap();
5258        let a = ListArray::from(a_list_data);
5259
5260        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5261        let mut reader = builder.build().unwrap();
5262        let out = reader.next().unwrap().unwrap();
5263        assert_eq!(out.num_rows(), 1);
5264        assert_eq!(out.num_columns(), 1);
5265        // grab first column
5266        let c0 = out.column(0);
5267        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5268        // get first row: [[1, 2], [3, 4]]
5269        let r0 = c0arr.value(0);
5270        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5271        assert_eq!(r0arr, &a);
5272    }
5273
5274    #[test]
5275    fn test_map_no_value() {
5276        // File schema:
5277        // message schema {
5278        //   required group my_map (MAP) {
5279        //     repeated group key_value {
5280        //       required int32 key;
5281        //       optional int32 value;
5282        //     }
5283        //   }
5284        //   required group my_map_no_v (MAP) {
5285        //     repeated group key_value {
5286        //       required int32 key;
5287        //     }
5288        //   }
5289        //   required group my_list (LIST) {
5290        //     repeated group list {
5291        //       required int32 element;
5292        //     }
5293        //   }
5294        // }
5295        let testdata = arrow::util::test_util::parquet_test_data();
5296        let path = format!("{testdata}/map_no_value.parquet");
5297        let file = File::open(path).unwrap();
5298
5299        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5300            .unwrap()
5301            .build()
5302            .unwrap();
5303        let out = reader.next().unwrap().unwrap();
5304        assert_eq!(out.num_rows(), 3);
5305        assert_eq!(out.num_columns(), 3);
5306        // my_map_no_v and my_list columns should now be equivalent
5307        let c0 = out.column(1).as_list::<i32>();
5308        let c1 = out.column(2).as_list::<i32>();
5309        assert_eq!(c0.len(), c1.len());
5310        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5311    }
5312
5313    #[test]
5314    fn test_row_filter_full_page_skip_is_handled() {
5315        let first_value: i64 = 1111;
5316        let last_value: i64 = 9999;
5317        let num_rows: usize = 12;
5318
5319        // build data with row selection average length 4
5320        // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999)
5321        // The Row Selection would be [1111, (skip 10), 9999]
5322        let schema = Arc::new(Schema::new(vec![
5323            Field::new("key", arrow_schema::DataType::Int64, false),
5324            Field::new("value", arrow_schema::DataType::Int64, false),
5325        ]));
5326
5327        let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5328        int_values[0] = first_value;
5329        int_values[num_rows - 1] = last_value;
5330        let keys = Int64Array::from(int_values.clone());
5331        let values = Int64Array::from(int_values.clone());
5332        let batch = RecordBatch::try_new(
5333            Arc::clone(&schema),
5334            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5335        )
5336        .unwrap();
5337
5338        let props = WriterProperties::builder()
5339            .set_write_batch_size(2)
5340            .set_data_page_row_count_limit(2)
5341            .build();
5342
5343        let mut buffer = Vec::new();
5344        let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5345        writer.write(&batch).unwrap();
5346        writer.close().unwrap();
5347        let data = Bytes::from(buffer);
5348
5349        let options = ArrowReaderOptions::new().with_page_index(true);
5350        let builder =
5351            ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5352        let schema = builder.parquet_schema().clone();
5353        let filter_mask = ProjectionMask::leaves(&schema, [0]);
5354
5355        let make_predicate = |mask: ProjectionMask| {
5356            ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5357                let column = batch.column(0);
5358                let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5359                let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5360                or(&match_first, &match_second)
5361            })
5362        };
5363
5364        let options = ArrowReaderOptions::new().with_page_index(true);
5365        let predicate = make_predicate(filter_mask.clone());
5366
5367        // The batch size is set to 12 to read all rows in one go after filtering
5368        // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded.
5369        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5370            .unwrap()
5371            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5372            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5373            .with_batch_size(12)
5374            .build()
5375            .unwrap();
5376
5377        // Predicate pruning used to panic once mask-backed plans removed whole pages.
5378        // Collecting into batches validates the plan now downgrades to selectors instead.
5379        let schema = reader.schema().clone();
5380        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5381        let result = concat_batches(&schema, &batches).unwrap();
5382        assert_eq!(result.num_rows(), 2);
5383    }
5384
5385    #[test]
5386    fn test_get_row_group_column_bloom_filter_with_length() {
5387        // convert to new parquet file with bloom_filter_length
5388        let testdata = arrow::util::test_util::parquet_test_data();
5389        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5390        let file = File::open(path).unwrap();
5391        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5392        let schema = builder.schema().clone();
5393        let reader = builder.build().unwrap();
5394
5395        let mut parquet_data = Vec::new();
5396        let props = WriterProperties::builder()
5397            .set_bloom_filter_enabled(true)
5398            .build();
5399        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5400        for batch in reader {
5401            let batch = batch.unwrap();
5402            writer.write(&batch).unwrap();
5403        }
5404        writer.close().unwrap();
5405
5406        // test the new parquet file
5407        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5408    }
5409
5410    #[test]
5411    fn test_get_row_group_column_bloom_filter_without_length() {
5412        let testdata = arrow::util::test_util::parquet_test_data();
5413        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5414        let data = Bytes::from(std::fs::read(path).unwrap());
5415        test_get_row_group_column_bloom_filter(data, false);
5416    }
5417
5418    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5419        let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5420
5421        let metadata = builder.metadata();
5422        assert_eq!(metadata.num_row_groups(), 1);
5423        let row_group = metadata.row_group(0);
5424        let column = row_group.column(0);
5425        assert_eq!(column.bloom_filter_length().is_some(), with_length);
5426
5427        let sbbf = builder
5428            .get_row_group_column_bloom_filter(0, 0)
5429            .unwrap()
5430            .unwrap();
5431        assert!(sbbf.check(&"Hello"));
5432        assert!(!sbbf.check(&"Hello_Not_Exists"));
5433    }
5434
5435    #[test]
5436    fn test_read_unknown_logical_type() {
5437        let testdata = arrow::util::test_util::parquet_test_data();
5438        let path = format!("{testdata}/unknown-logical-type.parquet");
5439        let test_file = File::open(path).unwrap();
5440
5441        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5442            .expect("Error creating reader builder");
5443
5444        let schema = builder.metadata().file_metadata().schema_descr();
5445        assert_eq!(
5446            schema.column(0).logical_type_ref(),
5447            Some(&LogicalType::String)
5448        );
5449        assert_eq!(
5450            schema.column(1).logical_type_ref(),
5451            Some(&LogicalType::_Unknown { field_id: 2555 })
5452        );
5453        assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5454
5455        let mut reader = builder.build().unwrap();
5456        let out = reader.next().unwrap().unwrap();
5457        assert_eq!(out.num_rows(), 3);
5458        assert_eq!(out.num_columns(), 2);
5459    }
5460
5461    #[test]
5462    fn test_read_row_numbers() {
5463        let file = write_parquet_from_iter(vec![(
5464            "value",
5465            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5466        )]);
5467        let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5468
5469        let row_number_field = Arc::new(
5470            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5471        );
5472
5473        let options = ArrowReaderOptions::new()
5474            .with_schema(Arc::new(Schema::new(supplied_fields)))
5475            .with_virtual_columns(vec![row_number_field.clone()])
5476            .unwrap();
5477        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5478            file.try_clone().unwrap(),
5479            options,
5480        )
5481        .expect("reader builder with schema")
5482        .build()
5483        .expect("reader with schema");
5484
5485        let batch = arrow_reader.next().unwrap().unwrap();
5486        let schema = Arc::new(Schema::new(vec![
5487            Field::new("value", ArrowDataType::Int64, false),
5488            (*row_number_field).clone(),
5489        ]));
5490
5491        assert_eq!(batch.schema(), schema);
5492        assert_eq!(batch.num_columns(), 2);
5493        assert_eq!(batch.num_rows(), 3);
5494        assert_eq!(
5495            batch
5496                .column(0)
5497                .as_primitive::<types::Int64Type>()
5498                .iter()
5499                .collect::<Vec<_>>(),
5500            vec![Some(1), Some(2), Some(3)]
5501        );
5502        assert_eq!(
5503            batch
5504                .column(1)
5505                .as_primitive::<types::Int64Type>()
5506                .iter()
5507                .collect::<Vec<_>>(),
5508            vec![Some(0), Some(1), Some(2)]
5509        );
5510    }
5511
5512    #[test]
5513    fn test_read_only_row_numbers() {
5514        let file = write_parquet_from_iter(vec![(
5515            "value",
5516            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5517        )]);
5518        let row_number_field = Arc::new(
5519            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5520        );
5521        let options = ArrowReaderOptions::new()
5522            .with_virtual_columns(vec![row_number_field.clone()])
5523            .unwrap();
5524        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5525        let num_columns = metadata
5526            .metadata
5527            .file_metadata()
5528            .schema_descr()
5529            .num_columns();
5530
5531        let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5532            .with_projection(ProjectionMask::none(num_columns))
5533            .build()
5534            .expect("reader with schema");
5535
5536        let batch = arrow_reader.next().unwrap().unwrap();
5537        let schema = Arc::new(Schema::new(vec![row_number_field]));
5538
5539        assert_eq!(batch.schema(), schema);
5540        assert_eq!(batch.num_columns(), 1);
5541        assert_eq!(batch.num_rows(), 3);
5542        assert_eq!(
5543            batch
5544                .column(0)
5545                .as_primitive::<types::Int64Type>()
5546                .iter()
5547                .collect::<Vec<_>>(),
5548            vec![Some(0), Some(1), Some(2)]
5549        );
5550    }
5551
5552    #[test]
5553    fn test_read_row_numbers_row_group_order() -> Result<()> {
5554        // Make a parquet file with 100 rows split across 2 row groups
5555        let array = Int64Array::from_iter_values(5000..5100);
5556        let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5557        let mut buffer = Vec::new();
5558        let options = WriterProperties::builder()
5559            .set_max_row_group_size(50)
5560            .build();
5561        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5562        // write in 10 row batches as the size limits are enforced after each batch
5563        for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5564            writer.write(&batch_chunk)?;
5565        }
5566        writer.close()?;
5567
5568        let row_number_field = Arc::new(
5569            Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5570        );
5571
5572        let buffer = Bytes::from(buffer);
5573
5574        let options =
5575            ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5576
5577        // read out with normal options
5578        let arrow_reader =
5579            ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5580                .build()?;
5581
5582        assert_eq!(
5583            ValuesAndRowNumbers {
5584                values: (5000..5100).collect(),
5585                row_numbers: (0..100).collect()
5586            },
5587            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5588        );
5589
5590        // Now read, out of order row groups
5591        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5592            .with_row_groups(vec![1, 0])
5593            .build()?;
5594
5595        assert_eq!(
5596            ValuesAndRowNumbers {
5597                values: (5050..5100).chain(5000..5050).collect(),
5598                row_numbers: (50..100).chain(0..50).collect(),
5599            },
5600            ValuesAndRowNumbers::new_from_reader(arrow_reader)
5601        );
5602
5603        Ok(())
5604    }
5605
5606    #[derive(Debug, PartialEq)]
5607    struct ValuesAndRowNumbers {
5608        values: Vec<i64>,
5609        row_numbers: Vec<i64>,
5610    }
5611    impl ValuesAndRowNumbers {
5612        fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5613            let mut values = vec![];
5614            let mut row_numbers = vec![];
5615            for batch in reader {
5616                let batch = batch.expect("Could not read batch");
5617                values.extend(
5618                    batch
5619                        .column_by_name("col")
5620                        .expect("Could not get col column")
5621                        .as_primitive::<arrow::datatypes::Int64Type>()
5622                        .iter()
5623                        .map(|v| v.expect("Could not get value")),
5624                );
5625
5626                row_numbers.extend(
5627                    batch
5628                        .column_by_name("row_number")
5629                        .expect("Could not get row_number column")
5630                        .as_primitive::<arrow::datatypes::Int64Type>()
5631                        .iter()
5632                        .map(|v| v.expect("Could not get row number"))
5633                        .collect::<Vec<_>>(),
5634                );
5635            }
5636            Self {
5637                values,
5638                row_numbers,
5639            }
5640        }
5641    }
5642
5643    #[test]
5644    fn test_with_virtual_columns_rejects_non_virtual_fields() {
5645        // Try to pass a regular field (not a virtual column) to with_virtual_columns
5646        let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5647        assert_eq!(
5648            ArrowReaderOptions::new()
5649                .with_virtual_columns(vec![regular_field])
5650                .unwrap_err()
5651                .to_string(),
5652            "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5653        );
5654    }
5655
5656    #[test]
5657    fn test_row_numbers_with_multiple_row_groups() {
5658        test_row_numbers_with_multiple_row_groups_helper(
5659            false,
5660            |path, selection, _row_filter, batch_size| {
5661                let file = File::open(path).unwrap();
5662                let row_number_field = Arc::new(
5663                    Field::new("row_number", ArrowDataType::Int64, false)
5664                        .with_extension_type(RowNumber),
5665                );
5666                let options = ArrowReaderOptions::new()
5667                    .with_virtual_columns(vec![row_number_field])
5668                    .unwrap();
5669                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5670                    .unwrap()
5671                    .with_row_selection(selection)
5672                    .with_batch_size(batch_size)
5673                    .build()
5674                    .expect("Could not create reader");
5675                reader
5676                    .collect::<Result<Vec<_>, _>>()
5677                    .expect("Could not read")
5678            },
5679        );
5680    }
5681
5682    #[test]
5683    fn test_row_numbers_with_multiple_row_groups_and_filter() {
5684        test_row_numbers_with_multiple_row_groups_helper(
5685            true,
5686            |path, selection, row_filter, batch_size| {
5687                let file = File::open(path).unwrap();
5688                let row_number_field = Arc::new(
5689                    Field::new("row_number", ArrowDataType::Int64, false)
5690                        .with_extension_type(RowNumber),
5691                );
5692                let options = ArrowReaderOptions::new()
5693                    .with_virtual_columns(vec![row_number_field])
5694                    .unwrap();
5695                let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5696                    .unwrap()
5697                    .with_row_selection(selection)
5698                    .with_batch_size(batch_size)
5699                    .with_row_filter(row_filter.expect("No filter"))
5700                    .build()
5701                    .expect("Could not create reader");
5702                reader
5703                    .collect::<Result<Vec<_>, _>>()
5704                    .expect("Could not read")
5705            },
5706        );
5707    }
5708
5709    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5710        use_filter: bool,
5711        test_case: F,
5712    ) where
5713        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5714    {
5715        let seed: u64 = random();
5716        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5717        let mut rng = StdRng::seed_from_u64(seed);
5718
5719        use tempfile::TempDir;
5720        let tempdir = TempDir::new().expect("Could not create temp dir");
5721
5722        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5723
5724        let path = tempdir.path().join("test.parquet");
5725        std::fs::write(&path, bytes).expect("Could not write file");
5726
5727        let mut case = vec![];
5728        let mut remaining = metadata.file_metadata().num_rows();
5729        while remaining > 0 {
5730            let row_count = rng.random_range(1..=remaining);
5731            remaining -= row_count;
5732            case.push(RowSelector {
5733                row_count: row_count as usize,
5734                skip: rng.random_bool(0.5),
5735            });
5736        }
5737
5738        let filter = use_filter.then(|| {
5739            let filter = (0..metadata.file_metadata().num_rows())
5740                .map(|_| rng.random_bool(0.99))
5741                .collect::<Vec<_>>();
5742            let mut filter_offset = 0;
5743            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5744                ProjectionMask::all(),
5745                move |b| {
5746                    let array = BooleanArray::from_iter(
5747                        filter
5748                            .iter()
5749                            .skip(filter_offset)
5750                            .take(b.num_rows())
5751                            .map(|x| Some(*x)),
5752                    );
5753                    filter_offset += b.num_rows();
5754                    Ok(array)
5755                },
5756            ))])
5757        });
5758
5759        let selection = RowSelection::from(case);
5760        let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5761
5762        if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5763            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5764            return;
5765        }
5766        let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5767            .expect("Failed to concatenate");
5768        // assert_eq!(selection.row_count(), actual.num_rows());
5769        let values = actual
5770            .column(0)
5771            .as_primitive::<types::Int64Type>()
5772            .iter()
5773            .collect::<Vec<_>>();
5774        let row_numbers = actual
5775            .column(1)
5776            .as_primitive::<types::Int64Type>()
5777            .iter()
5778            .collect::<Vec<_>>();
5779        assert_eq!(
5780            row_numbers
5781                .into_iter()
5782                .map(|number| number.map(|number| number + 1))
5783                .collect::<Vec<_>>(),
5784            values
5785        );
5786    }
5787
5788    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5789        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5790            "value",
5791            ArrowDataType::Int64,
5792            false,
5793        )])));
5794
5795        let mut buf = Vec::with_capacity(1024);
5796        let mut writer =
5797            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5798
5799        let mut values = 1..=rng.random_range(1..4096);
5800        while !values.is_empty() {
5801            let batch_values = values
5802                .by_ref()
5803                .take(rng.random_range(1..4096))
5804                .collect::<Vec<_>>();
5805            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5806            let batch =
5807                RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5808            writer.write(&batch).expect("Could not write batch");
5809            writer.flush().expect("Could not flush");
5810        }
5811        let metadata = writer.close().expect("Could not close writer");
5812
5813        (Bytes::from(buf), metadata)
5814    }
5815}