parquet/arrow/arrow_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains reader which reads parquet data into arrow [`RecordBatch`]
19
20use std::collections::VecDeque;
21use std::sync::Arc;
22
23use arrow_array::cast::AsArray;
24use arrow_array::Array;
25use arrow_array::{RecordBatch, RecordBatchReader};
26use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
27use arrow_select::filter::prep_null_mask_filter;
28pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
29pub use selection::{RowSelection, RowSelector};
30
31pub use crate::arrow::array_reader::RowGroups;
32use crate::arrow::array_reader::{build_array_reader, ArrayReader};
33use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
34use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
35use crate::column::page::{PageIterator, PageReader};
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41mod filter;
42mod selection;
43pub mod statistics;
44
45/// Builder for constructing parquet readers into arrow.
46///
47/// Most users should use one of the following specializations:
48///
49/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
50/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
51///
52/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
53pub struct ArrowReaderBuilder<T> {
54    pub(crate) input: T,
55
56    pub(crate) metadata: Arc<ParquetMetaData>,
57
58    pub(crate) schema: SchemaRef,
59
60    pub(crate) fields: Option<Arc<ParquetField>>,
61
62    pub(crate) batch_size: usize,
63
64    pub(crate) row_groups: Option<Vec<usize>>,
65
66    pub(crate) projection: ProjectionMask,
67
68    pub(crate) filter: Option<RowFilter>,
69
70    pub(crate) selection: Option<RowSelection>,
71
72    pub(crate) limit: Option<usize>,
73
74    pub(crate) offset: Option<usize>,
75}
76
77impl<T> ArrowReaderBuilder<T> {
78    pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
79        Self {
80            input,
81            metadata: metadata.metadata,
82            schema: metadata.schema,
83            fields: metadata.fields,
84            batch_size: 1024,
85            row_groups: None,
86            projection: ProjectionMask::all(),
87            filter: None,
88            selection: None,
89            limit: None,
90            offset: None,
91        }
92    }
93
94    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
95    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
96        &self.metadata
97    }
98
99    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
100    pub fn parquet_schema(&self) -> &SchemaDescriptor {
101        self.metadata.file_metadata().schema_descr()
102    }
103
104    /// Returns the arrow [`SchemaRef`] for this parquet file
105    pub fn schema(&self) -> &SchemaRef {
106        &self.schema
107    }
108
109    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
110    /// If the batch_size more than the file row count, use the file row count.
111    pub fn with_batch_size(self, batch_size: usize) -> Self {
112        // Try to avoid allocate large buffer
113        let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
114        Self { batch_size, ..self }
115    }
116
117    /// Only read data from the provided row group indexes
118    ///
119    /// This is also called row group filtering
120    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
121        Self {
122            row_groups: Some(row_groups),
123            ..self
124        }
125    }
126
127    /// Only read data from the provided column indexes
128    pub fn with_projection(self, mask: ProjectionMask) -> Self {
129        Self {
130            projection: mask,
131            ..self
132        }
133    }
134
135    /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
136    /// data into memory.
137    ///
138    /// This feature is used to restrict which rows are decoded within row
139    /// groups, skipping ranges of rows that are not needed. Such selections
140    /// could be determined by evaluating predicates against the parquet page
141    /// [`Index`] or some other external information available to a query
142    /// engine.
143    ///
144    /// # Notes
145    ///
146    /// Row group filtering (see [`Self::with_row_groups`]) is applied prior to
147    /// applying the row selection, and therefore rows from skipped row groups
148    /// should not be included in the [`RowSelection`] (see example below)
149    ///
150    /// It is recommended to enable writing the page index if using this
151    /// functionality, to allow more efficient skipping over data pages. See
152    /// [`ArrowReaderOptions::with_page_index`].
153    ///
154    /// # Example
155    ///
156    /// Given a parquet file with 4 row groups, and a row group filter of `[0,
157    /// 2, 3]`, in order to scan rows 50-100 in row group 2 and rows 200-300 in
158    /// row group 3:
159    ///
160    /// ```text
161    ///   Row Group 0, 1000 rows (selected)
162    ///   Row Group 1, 1000 rows (skipped)
163    ///   Row Group 2, 1000 rows (selected, but want to only scan rows 50-100)
164    ///   Row Group 3, 1000 rows (selected, but want to only scan rows 200-300)
165    /// ```
166    ///
167    /// You could pass the following [`RowSelection`]:
168    ///
169    /// ```text
170    ///  Select 1000    (scan all rows in row group 0)
171    ///  Skip 50        (skip the first 50 rows in row group 2)
172    ///  Select 50      (scan rows 50-100 in row group 2)
173    ///  Skip 900       (skip the remaining rows in row group 2)
174    ///  Skip 200       (skip the first 200 rows in row group 3)
175    ///  Select 100     (scan rows 200-300 in row group 3)
176    ///  Skip 700       (skip the remaining rows in row group 3)
177    /// ```
178    /// Note there is no entry for the (entirely) skipped row group 1.
179    ///
180    /// Note you can represent the same selection with fewer entries. Instead of
181    ///
182    /// ```text
183    ///  Skip 900       (skip the remaining rows in row group 2)
184    ///  Skip 200       (skip the first 200 rows in row group 3)
185    /// ```
186    ///
187    /// you could use
188    ///
189    /// ```text
190    /// Skip 1100      (skip the remaining 900 rows in row group 2 and the first 200 rows in row group 3)
191    /// ```
192    ///
193    /// [`Index`]: crate::file::page_index::index::Index
194    pub fn with_row_selection(self, selection: RowSelection) -> Self {
195        Self {
196            selection: Some(selection),
197            ..self
198        }
199    }
200
201    /// Provide a [`RowFilter`] to skip decoding rows
202    ///
203    /// Row filters are applied after row group selection and row selection
204    ///
205    /// It is recommended to enable reading the page index if using this functionality, to allow
206    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`].
207    pub fn with_row_filter(self, filter: RowFilter) -> Self {
208        Self {
209            filter: Some(filter),
210            ..self
211        }
212    }
213
214    /// Provide a limit to the number of rows to be read
215    ///
216    /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
217    /// allowing it to limit the final set of rows decoded after any pushed down predicates
218    ///
219    /// It is recommended to enable reading the page index if using this functionality, to allow
220    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
221    pub fn with_limit(self, limit: usize) -> Self {
222        Self {
223            limit: Some(limit),
224            ..self
225        }
226    }
227
228    /// Provide an offset to skip over the given number of rows
229    ///
230    /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
231    /// allowing it to skip rows after any pushed down predicates
232    ///
233    /// It is recommended to enable reading the page index if using this functionality, to allow
234    /// more efficient skipping over data pages. See [`ArrowReaderOptions::with_page_index`]
235    pub fn with_offset(self, offset: usize) -> Self {
236        Self {
237            offset: Some(offset),
238            ..self
239        }
240    }
241}
242
243/// Options that control how metadata is read for a parquet file
244///
245/// See [`ArrowReaderBuilder`] for how to configure how the column data
246/// is then read from the file, including projection and filter pushdown
247#[derive(Debug, Clone, Default)]
248pub struct ArrowReaderOptions {
249    /// Should the reader strip any user defined metadata from the Arrow schema
250    skip_arrow_metadata: bool,
251    /// If provided used as the schema for the file, otherwise the schema is read from the file
252    supplied_schema: Option<SchemaRef>,
253    /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
254    pub(crate) page_index: bool,
255}
256
257impl ArrowReaderOptions {
258    /// Create a new [`ArrowReaderOptions`] with the default settings
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Skip decoding the embedded arrow metadata (defaults to `false`)
264    ///
265    /// Parquet files generated by some writers may contain embedded arrow
266    /// schema and metadata.
267    /// This may not be correct or compatible with your system,
268    /// for example: [ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
269    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
270        Self {
271            skip_arrow_metadata,
272            ..self
273        }
274    }
275
276    /// Provide a schema to use when reading the parquet file. If provided it
277    /// takes precedence over the schema inferred from the file or the schema defined
278    /// in the file's metadata. If the schema is not compatible with the file's
279    /// schema an error will be returned when constructing the builder.
280    ///
281    /// This option is only required if you want to cast columns to a different type.
282    /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp
283    /// in the Arrow schema.
284    ///
285    /// The supplied schema must have the same number of columns as the parquet schema and
286    /// the column names need to be the same.
287    ///
288    /// # Example
289    /// ```
290    /// use std::io::Bytes;
291    /// use std::sync::Arc;
292    /// use tempfile::tempfile;
293    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
294    /// use arrow_schema::{DataType, Field, Schema, TimeUnit};
295    /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
296    /// use parquet::arrow::ArrowWriter;
297    ///
298    /// // Write data - schema is inferred from the data to be Int32
299    /// let file = tempfile().unwrap();
300    /// let batch = RecordBatch::try_from_iter(vec![
301    ///     ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
302    /// ]).unwrap();
303    /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap();
304    /// writer.write(&batch).unwrap();
305    /// writer.close().unwrap();
306    ///
307    /// // Read the file back.
308    /// // Supply a schema that interprets the Int32 column as a Timestamp.
309    /// let supplied_schema = Arc::new(Schema::new(vec![
310    ///     Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
311    /// ]));
312    /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
313    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
314    ///     file.try_clone().unwrap(),
315    ///     options
316    /// ).expect("Error if the schema is not compatible with the parquet file schema.");
317    ///
318    /// // Create the reader and read the data using the supplied schema.
319    /// let mut reader = builder.build().unwrap();
320    /// let _batch = reader.next().unwrap().unwrap();   
321    /// ```
322    pub fn with_schema(self, schema: SchemaRef) -> Self {
323        Self {
324            supplied_schema: Some(schema),
325            skip_arrow_metadata: true,
326            ..self
327        }
328    }
329
330    /// Enable reading [`PageIndex`], if present (defaults to `false`)
331    ///
332    /// The `PageIndex` can be used to push down predicates to the parquet scan,
333    /// potentially eliminating unnecessary IO, by some query engines.
334    ///
335    /// If this is enabled, [`ParquetMetaData::column_index`] and
336    /// [`ParquetMetaData::offset_index`] will be populated if the corresponding
337    /// information is present in the file.
338    ///
339    /// [`PageIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
340    /// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
341    /// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
342    pub fn with_page_index(self, page_index: bool) -> Self {
343        Self { page_index, ..self }
344    }
345}
346
347/// The metadata necessary to construct a [`ArrowReaderBuilder`]
348///
349/// Note this structure is cheaply clone-able as it consists of several arcs.
350///
351/// This structure allows
352///
353/// 1. Loading metadata for a file once and then using that same metadata to
354///    construct multiple separate readers, for example, to distribute readers
355///    across multiple threads
356///
357/// 2. Using a cached copy of the [`ParquetMetadata`] rather than reading it
358///    from the file each time a reader is constructed.
359///
360/// [`ParquetMetadata`]: crate::file::metadata::ParquetMetaData
361#[derive(Debug, Clone)]
362pub struct ArrowReaderMetadata {
363    /// The Parquet Metadata, if known aprior
364    pub(crate) metadata: Arc<ParquetMetaData>,
365    /// The Arrow Schema
366    pub(crate) schema: SchemaRef,
367
368    pub(crate) fields: Option<Arc<ParquetField>>,
369}
370
371impl ArrowReaderMetadata {
372    /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
373    ///
374    /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
375    /// example of how this can be used
376    ///
377    /// # Notes
378    ///
379    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
380    /// `Self::metadata` is missing the page index, this function will attempt
381    /// to load the page index by making an object store request.
382    pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
383        let metadata = ParquetMetaDataReader::new()
384            .with_page_indexes(options.page_index)
385            .parse_and_finish(reader)?;
386        Self::try_new(Arc::new(metadata), options)
387    }
388
389    /// Create a new [`ArrowReaderMetadata`]
390    ///
391    /// # Notes
392    ///
393    /// This function does not attempt to load the PageIndex if not present in the metadata.
394    /// See [`Self::load`] for more details.
395    pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
396        match options.supplied_schema {
397            Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
398            None => {
399                let kv_metadata = match options.skip_arrow_metadata {
400                    true => None,
401                    false => metadata.file_metadata().key_value_metadata(),
402                };
403
404                let (schema, fields) = parquet_to_arrow_schema_and_fields(
405                    metadata.file_metadata().schema_descr(),
406                    ProjectionMask::all(),
407                    kv_metadata,
408                )?;
409
410                Ok(Self {
411                    metadata,
412                    schema: Arc::new(schema),
413                    fields: fields.map(Arc::new),
414                })
415            }
416        }
417    }
418
419    fn with_supplied_schema(
420        metadata: Arc<ParquetMetaData>,
421        supplied_schema: SchemaRef,
422    ) -> Result<Self> {
423        let parquet_schema = metadata.file_metadata().schema_descr();
424        let field_levels = parquet_to_arrow_field_levels(
425            parquet_schema,
426            ProjectionMask::all(),
427            Some(supplied_schema.fields()),
428        )?;
429        let fields = field_levels.fields;
430        let inferred_len = fields.len();
431        let supplied_len = supplied_schema.fields().len();
432        // Ensure the supplied schema has the same number of columns as the parquet schema.
433        // parquet_to_arrow_field_levels is expected to throw an error if the schemas have
434        // different lengths, but we check here to be safe.
435        if inferred_len != supplied_len {
436            Err(arrow_err!(format!(
437                "incompatible arrow schema, expected {} columns received {}",
438                inferred_len, supplied_len
439            )))
440        } else {
441            let diff_fields: Vec<_> = supplied_schema
442                .fields()
443                .iter()
444                .zip(fields.iter())
445                .filter_map(|(field1, field2)| {
446                    if field1 != field2 {
447                        Some(field1.name().clone())
448                    } else {
449                        None
450                    }
451                })
452                .collect();
453
454            if !diff_fields.is_empty() {
455                Err(ParquetError::ArrowError(format!(
456                    "incompatible arrow schema, the following fields could not be cast: [{}]",
457                    diff_fields.join(", ")
458                )))
459            } else {
460                Ok(Self {
461                    metadata,
462                    schema: supplied_schema,
463                    fields: field_levels.levels.map(Arc::new),
464                })
465            }
466        }
467    }
468
469    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
470    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
471        &self.metadata
472    }
473
474    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
475    pub fn parquet_schema(&self) -> &SchemaDescriptor {
476        self.metadata.file_metadata().schema_descr()
477    }
478
479    /// Returns the arrow [`SchemaRef`] for this parquet file
480    pub fn schema(&self) -> &SchemaRef {
481        &self.schema
482    }
483}
484
485#[doc(hidden)]
486/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
487pub struct SyncReader<T: ChunkReader>(T);
488
489/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
490///
491/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
492///
493/// See [`ArrowReaderBuilder`] for additional member functions
494pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
495
496impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
497    /// Create a new [`ParquetRecordBatchReaderBuilder`]
498    ///
499    /// ```
500    /// # use std::sync::Arc;
501    /// # use bytes::Bytes;
502    /// # use arrow_array::{Int32Array, RecordBatch};
503    /// # use arrow_schema::{DataType, Field, Schema};
504    /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
505    /// # use parquet::arrow::ArrowWriter;
506    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
507    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
508    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
509    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
510    /// # writer.write(&batch).unwrap();
511    /// # writer.close().unwrap();
512    /// # let file = Bytes::from(file);
513    /// #
514    /// let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
515    ///
516    /// // Inspect metadata
517    /// assert_eq!(builder.metadata().num_row_groups(), 1);
518    ///
519    /// // Construct reader
520    /// let mut reader: ParquetRecordBatchReader = builder.with_row_groups(vec![0]).build().unwrap();
521    ///
522    /// // Read data
523    /// let _batch = reader.next().unwrap().unwrap();
524    /// ```
525    pub fn try_new(reader: T) -> Result<Self> {
526        Self::try_new_with_options(reader, Default::default())
527    }
528
529    /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
530    pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
531        let metadata = ArrowReaderMetadata::load(&reader, options)?;
532        Ok(Self::new_with_metadata(reader, metadata))
533    }
534
535    /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
536    ///
537    /// This interface allows:
538    ///
539    /// 1. Loading metadata once and using it to create multiple builders with
540    ///    potentially different settings or run on different threads
541    ///
542    /// 2. Using a cached copy of the metadata rather than re-reading it from the
543    ///    file each time a reader is constructed.
544    ///
545    /// See the docs on [`ArrowReaderMetadata`] for more details
546    ///
547    /// # Example
548    /// ```
549    /// # use std::fs::metadata;
550    /// # use std::sync::Arc;
551    /// # use bytes::Bytes;
552    /// # use arrow_array::{Int32Array, RecordBatch};
553    /// # use arrow_schema::{DataType, Field, Schema};
554    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
555    /// # use parquet::arrow::ArrowWriter;
556    /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
557    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
558    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
559    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
560    /// # writer.write(&batch).unwrap();
561    /// # writer.close().unwrap();
562    /// # let file = Bytes::from(file);
563    /// #
564    /// let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap();
565    /// let mut a = ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(), metadata.clone()).build().unwrap();
566    /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata).build().unwrap();
567    ///
568    /// // Should be able to read from both in parallel
569    /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
570    /// ```
571    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
572        Self::new_builder(SyncReader(input), metadata)
573    }
574
575    /// Build a [`ParquetRecordBatchReader`]
576    ///
577    /// Note: this will eagerly evaluate any `RowFilter` before returning
578    pub fn build(self) -> Result<ParquetRecordBatchReader> {
579        // Try to avoid allocate large buffer
580        let batch_size = self
581            .batch_size
582            .min(self.metadata.file_metadata().num_rows() as usize);
583
584        let row_groups = self
585            .row_groups
586            .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
587
588        let reader = ReaderRowGroups {
589            reader: Arc::new(self.input.0),
590            metadata: self.metadata,
591            row_groups,
592        };
593
594        let mut filter = self.filter;
595        let mut selection = self.selection;
596
597        if let Some(filter) = filter.as_mut() {
598            for predicate in filter.predicates.iter_mut() {
599                if !selects_any(selection.as_ref()) {
600                    break;
601                }
602
603                let array_reader =
604                    build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
605
606                selection = Some(evaluate_predicate(
607                    batch_size,
608                    array_reader,
609                    selection,
610                    predicate.as_mut(),
611                )?);
612            }
613        }
614
615        let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
616
617        // If selection is empty, truncate
618        if !selects_any(selection.as_ref()) {
619            selection = Some(RowSelection::from(vec![]));
620        }
621
622        Ok(ParquetRecordBatchReader::new(
623            batch_size,
624            array_reader,
625            apply_range(selection, reader.num_rows(), self.offset, self.limit),
626        ))
627    }
628}
629
630struct ReaderRowGroups<T: ChunkReader> {
631    reader: Arc<T>,
632
633    metadata: Arc<ParquetMetaData>,
634    /// Optional list of row group indices to scan
635    row_groups: Vec<usize>,
636}
637
638impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
639    fn num_rows(&self) -> usize {
640        let meta = self.metadata.row_groups();
641        self.row_groups
642            .iter()
643            .map(|x| meta[*x].num_rows() as usize)
644            .sum()
645    }
646
647    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
648        Ok(Box::new(ReaderPageIterator {
649            column_idx: i,
650            reader: self.reader.clone(),
651            metadata: self.metadata.clone(),
652            row_groups: self.row_groups.clone().into_iter(),
653        }))
654    }
655}
656
657struct ReaderPageIterator<T: ChunkReader> {
658    reader: Arc<T>,
659    column_idx: usize,
660    row_groups: std::vec::IntoIter<usize>,
661    metadata: Arc<ParquetMetaData>,
662}
663
664impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
665    type Item = Result<Box<dyn PageReader>>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        let rg_idx = self.row_groups.next()?;
669        let rg = self.metadata.row_group(rg_idx);
670        let meta = rg.column(self.column_idx);
671        let offset_index = self.metadata.offset_index();
672        // `offset_index` may not exist and `i[rg_idx]` will be empty.
673        // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
674        let page_locations = offset_index
675            .filter(|i| !i[rg_idx].is_empty())
676            .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
677        let total_rows = rg.num_rows() as usize;
678        let reader = self.reader.clone();
679
680        let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
681        Some(ret.map(|x| Box::new(x) as _))
682    }
683}
684
685impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
686
687/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
688/// read from a parquet data source
689pub struct ParquetRecordBatchReader {
690    batch_size: usize,
691    array_reader: Box<dyn ArrayReader>,
692    schema: SchemaRef,
693    selection: Option<VecDeque<RowSelector>>,
694}
695
696impl Iterator for ParquetRecordBatchReader {
697    type Item = Result<RecordBatch, ArrowError>;
698
699    fn next(&mut self) -> Option<Self::Item> {
700        let mut read_records = 0;
701        match self.selection.as_mut() {
702            Some(selection) => {
703                while read_records < self.batch_size && !selection.is_empty() {
704                    let front = selection.pop_front().unwrap();
705                    if front.skip {
706                        let skipped = match self.array_reader.skip_records(front.row_count) {
707                            Ok(skipped) => skipped,
708                            Err(e) => return Some(Err(e.into())),
709                        };
710
711                        if skipped != front.row_count {
712                            return Some(Err(general_err!(
713                                "failed to skip rows, expected {}, got {}",
714                                front.row_count,
715                                skipped
716                            )
717                            .into()));
718                        }
719                        continue;
720                    }
721
722                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
723                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
724                    if front.row_count == 0 {
725                        continue;
726                    }
727
728                    // try to read record
729                    let need_read = self.batch_size - read_records;
730                    let to_read = match front.row_count.checked_sub(need_read) {
731                        Some(remaining) if remaining != 0 => {
732                            // if page row count less than batch_size we must set batch size to page row count.
733                            // add check avoid dead loop
734                            selection.push_front(RowSelector::select(remaining));
735                            need_read
736                        }
737                        _ => front.row_count,
738                    };
739                    match self.array_reader.read_records(to_read) {
740                        Ok(0) => break,
741                        Ok(rec) => read_records += rec,
742                        Err(error) => return Some(Err(error.into())),
743                    }
744                }
745            }
746            None => {
747                if let Err(error) = self.array_reader.read_records(self.batch_size) {
748                    return Some(Err(error.into()));
749                }
750            }
751        };
752
753        match self.array_reader.consume_batch() {
754            Err(error) => Some(Err(error.into())),
755            Ok(array) => {
756                let struct_array = array.as_struct_opt().ok_or_else(|| {
757                    ArrowError::ParquetError(
758                        "Struct array reader should return struct array".to_string(),
759                    )
760                });
761
762                match struct_array {
763                    Err(err) => Some(Err(err)),
764                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
765                }
766            }
767        }
768    }
769}
770
771impl RecordBatchReader for ParquetRecordBatchReader {
772    /// Returns the projected [`SchemaRef`] for reading the parquet file.
773    ///
774    /// Note that the schema metadata will be stripped here. See
775    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
776    fn schema(&self) -> SchemaRef {
777        self.schema.clone()
778    }
779}
780
781impl ParquetRecordBatchReader {
782    /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader
783    ///
784    /// See [`ParquetRecordBatchReaderBuilder`] for more options
785    pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
786        ParquetRecordBatchReaderBuilder::try_new(reader)?
787            .with_batch_size(batch_size)
788            .build()
789    }
790
791    /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
792    ///
793    /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
794    /// higher-level interface for reading parquet data from a file
795    pub fn try_new_with_row_groups(
796        levels: &FieldLevels,
797        row_groups: &dyn RowGroups,
798        batch_size: usize,
799        selection: Option<RowSelection>,
800    ) -> Result<Self> {
801        let array_reader =
802            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
803
804        Ok(Self {
805            batch_size,
806            array_reader,
807            schema: Arc::new(Schema::new(levels.fields.clone())),
808            selection: selection.map(|s| s.trim().into()),
809        })
810    }
811
812    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
813    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
814    /// all rows will be returned
815    pub(crate) fn new(
816        batch_size: usize,
817        array_reader: Box<dyn ArrayReader>,
818        selection: Option<RowSelection>,
819    ) -> Self {
820        let schema = match array_reader.get_data_type() {
821            ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
822            _ => unreachable!("Struct array reader's data type is not struct!"),
823        };
824
825        Self {
826            batch_size,
827            array_reader,
828            schema: Arc::new(schema),
829            selection: selection.map(|s| s.trim().into()),
830        }
831    }
832}
833
834/// Returns `true` if `selection` is `None` or selects some rows
835pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
836    selection.map(|x| x.selects_any()).unwrap_or(true)
837}
838
839/// Applies an optional offset and limit to an optional [`RowSelection`]
840pub(crate) fn apply_range(
841    mut selection: Option<RowSelection>,
842    row_count: usize,
843    offset: Option<usize>,
844    limit: Option<usize>,
845) -> Option<RowSelection> {
846    // If an offset is defined, apply it to the `selection`
847    if let Some(offset) = offset {
848        selection = Some(match row_count.checked_sub(offset) {
849            None => RowSelection::from(vec![]),
850            Some(remaining) => selection
851                .map(|selection| selection.offset(offset))
852                .unwrap_or_else(|| {
853                    RowSelection::from(vec![
854                        RowSelector::skip(offset),
855                        RowSelector::select(remaining),
856                    ])
857                }),
858        });
859    }
860
861    // If a limit is defined, apply it to the final `selection`
862    if let Some(limit) = limit {
863        selection = Some(
864            selection
865                .map(|selection| selection.limit(limit))
866                .unwrap_or_else(|| {
867                    RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
868                }),
869        );
870    }
871    selection
872}
873
874/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
875/// which rows to return.
876///
877/// `input_selection`: Optional pre-existing selection. If `Some`, then the
878/// final [`RowSelection`] will be the conjunction of it and the rows selected
879/// by `predicate`.
880///
881/// Note: A pre-existing selection may come from evaluating a previous predicate
882/// or if the [`ParquetRecordBatchReader`] specified an explicit
883/// [`RowSelection`] in addition to one or more predicates.
884pub(crate) fn evaluate_predicate(
885    batch_size: usize,
886    array_reader: Box<dyn ArrayReader>,
887    input_selection: Option<RowSelection>,
888    predicate: &mut dyn ArrowPredicate,
889) -> Result<RowSelection> {
890    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
891    let mut filters = vec![];
892    for maybe_batch in reader {
893        let maybe_batch = maybe_batch?;
894        let input_rows = maybe_batch.num_rows();
895        let filter = predicate.evaluate(maybe_batch)?;
896        // Since user supplied predicate, check error here to catch bugs quickly
897        if filter.len() != input_rows {
898            return Err(arrow_err!(
899                "ArrowPredicate predicate returned {} rows, expected {input_rows}",
900                filter.len()
901            ));
902        }
903        match filter.null_count() {
904            0 => filters.push(filter),
905            _ => filters.push(prep_null_mask_filter(&filter)),
906        };
907    }
908
909    let raw = RowSelection::from_filters(&filters);
910    Ok(match input_selection {
911        Some(selection) => selection.and_then(&raw),
912        None => raw,
913    })
914}
915
916#[cfg(test)]
917mod tests {
918    use std::cmp::min;
919    use std::collections::{HashMap, VecDeque};
920    use std::fmt::Formatter;
921    use std::fs::File;
922    use std::io::Seek;
923    use std::path::PathBuf;
924    use std::sync::Arc;
925
926    use bytes::Bytes;
927    use half::f16;
928    use num::PrimInt;
929    use rand::{thread_rng, Rng, RngCore};
930    use tempfile::tempfile;
931
932    use arrow_array::builder::*;
933    use arrow_array::cast::AsArray;
934    use arrow_array::types::{
935        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
936        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
937    };
938    use arrow_array::*;
939    use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
940    use arrow_data::{ArrayData, ArrayDataBuilder};
941    use arrow_schema::{
942        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
943    };
944    use arrow_select::concat::concat_batches;
945
946    use crate::arrow::arrow_reader::{
947        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
948        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
949    };
950    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
951    use crate::arrow::{ArrowWriter, ProjectionMask};
952    use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
953    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
954    use crate::data_type::{
955        BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
956        FloatType, Int32Type, Int64Type, Int96Type,
957    };
958    use crate::errors::Result;
959    use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
960    use crate::file::writer::SerializedFileWriter;
961    use crate::schema::parser::parse_message_type;
962    use crate::schema::types::{Type, TypePtr};
963    use crate::util::test_common::rand_gen::RandGen;
964
965    #[test]
966    fn test_arrow_reader_all_columns() {
967        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
968
969        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
970        let original_schema = Arc::clone(builder.schema());
971        let reader = builder.build().unwrap();
972
973        // Verify that the schema was correctly parsed
974        assert_eq!(original_schema.fields(), reader.schema().fields());
975    }
976
977    #[test]
978    fn test_arrow_reader_single_column() {
979        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
980
981        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
982        let original_schema = Arc::clone(builder.schema());
983
984        let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
985        let reader = builder.with_projection(mask).build().unwrap();
986
987        // Verify that the schema was correctly parsed
988        assert_eq!(1, reader.schema().fields().len());
989        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
990    }
991
992    #[test]
993    fn test_arrow_reader_single_column_by_name() {
994        let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
995
996        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
997        let original_schema = Arc::clone(builder.schema());
998
999        let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1000        let reader = builder.with_projection(mask).build().unwrap();
1001
1002        // Verify that the schema was correctly parsed
1003        assert_eq!(1, reader.schema().fields().len());
1004        assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1005    }
1006
1007    #[test]
1008    fn test_null_column_reader_test() {
1009        let mut file = tempfile::tempfile().unwrap();
1010
1011        let schema = "
1012            message message {
1013                OPTIONAL INT32 int32;
1014            }
1015        ";
1016        let schema = Arc::new(parse_message_type(schema).unwrap());
1017
1018        let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1019        generate_single_column_file_with_data::<Int32Type>(
1020            &[vec![], vec![]],
1021            Some(&def_levels),
1022            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
1023            schema,
1024            Some(Field::new("int32", ArrowDataType::Null, true)),
1025            &Default::default(),
1026        )
1027        .unwrap();
1028
1029        file.rewind().unwrap();
1030
1031        let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1032        let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1033
1034        assert_eq!(batches.len(), 4);
1035        for batch in &batches[0..3] {
1036            assert_eq!(batch.num_rows(), 2);
1037            assert_eq!(batch.num_columns(), 1);
1038            assert_eq!(batch.column(0).null_count(), 2);
1039        }
1040
1041        assert_eq!(batches[3].num_rows(), 1);
1042        assert_eq!(batches[3].num_columns(), 1);
1043        assert_eq!(batches[3].column(0).null_count(), 1);
1044    }
1045
1046    #[test]
1047    fn test_primitive_single_column_reader_test() {
1048        run_single_column_reader_tests::<BoolType, _, BoolType>(
1049            2,
1050            ConvertedType::NONE,
1051            None,
1052            |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1053            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1054        );
1055        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1056            2,
1057            ConvertedType::NONE,
1058            None,
1059            |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1060            &[
1061                Encoding::PLAIN,
1062                Encoding::RLE_DICTIONARY,
1063                Encoding::DELTA_BINARY_PACKED,
1064                Encoding::BYTE_STREAM_SPLIT,
1065            ],
1066        );
1067        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1068            2,
1069            ConvertedType::NONE,
1070            None,
1071            |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1072            &[
1073                Encoding::PLAIN,
1074                Encoding::RLE_DICTIONARY,
1075                Encoding::DELTA_BINARY_PACKED,
1076                Encoding::BYTE_STREAM_SPLIT,
1077            ],
1078        );
1079        run_single_column_reader_tests::<FloatType, _, FloatType>(
1080            2,
1081            ConvertedType::NONE,
1082            None,
1083            |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1084            &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1085        );
1086    }
1087
1088    #[test]
1089    fn test_unsigned_primitive_single_column_reader_test() {
1090        run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1091            2,
1092            ConvertedType::UINT_32,
1093            Some(ArrowDataType::UInt32),
1094            |vals| {
1095                Arc::new(UInt32Array::from_iter(
1096                    vals.iter().map(|x| x.map(|x| x as u32)),
1097                ))
1098            },
1099            &[
1100                Encoding::PLAIN,
1101                Encoding::RLE_DICTIONARY,
1102                Encoding::DELTA_BINARY_PACKED,
1103            ],
1104        );
1105        run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1106            2,
1107            ConvertedType::UINT_64,
1108            Some(ArrowDataType::UInt64),
1109            |vals| {
1110                Arc::new(UInt64Array::from_iter(
1111                    vals.iter().map(|x| x.map(|x| x as u64)),
1112                ))
1113            },
1114            &[
1115                Encoding::PLAIN,
1116                Encoding::RLE_DICTIONARY,
1117                Encoding::DELTA_BINARY_PACKED,
1118            ],
1119        );
1120    }
1121
1122    #[test]
1123    fn test_unsigned_roundtrip() {
1124        let schema = Arc::new(Schema::new(vec![
1125            Field::new("uint32", ArrowDataType::UInt32, true),
1126            Field::new("uint64", ArrowDataType::UInt64, true),
1127        ]));
1128
1129        let mut buf = Vec::with_capacity(1024);
1130        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1131
1132        let original = RecordBatch::try_new(
1133            schema,
1134            vec![
1135                Arc::new(UInt32Array::from_iter_values([
1136                    0,
1137                    i32::MAX as u32,
1138                    u32::MAX,
1139                ])),
1140                Arc::new(UInt64Array::from_iter_values([
1141                    0,
1142                    i64::MAX as u64,
1143                    u64::MAX,
1144                ])),
1145            ],
1146        )
1147        .unwrap();
1148
1149        writer.write(&original).unwrap();
1150        writer.close().unwrap();
1151
1152        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1153        let ret = reader.next().unwrap().unwrap();
1154        assert_eq!(ret, original);
1155
1156        // Check they can be downcast to the correct type
1157        ret.column(0)
1158            .as_any()
1159            .downcast_ref::<UInt32Array>()
1160            .unwrap();
1161
1162        ret.column(1)
1163            .as_any()
1164            .downcast_ref::<UInt64Array>()
1165            .unwrap();
1166    }
1167
1168    #[test]
1169    fn test_float16_roundtrip() -> Result<()> {
1170        let schema = Arc::new(Schema::new(vec![
1171            Field::new("float16", ArrowDataType::Float16, false),
1172            Field::new("float16-nullable", ArrowDataType::Float16, true),
1173        ]));
1174
1175        let mut buf = Vec::with_capacity(1024);
1176        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1177
1178        let original = RecordBatch::try_new(
1179            schema,
1180            vec![
1181                Arc::new(Float16Array::from_iter_values([
1182                    f16::EPSILON,
1183                    f16::MIN,
1184                    f16::MAX,
1185                    f16::NAN,
1186                    f16::INFINITY,
1187                    f16::NEG_INFINITY,
1188                    f16::ONE,
1189                    f16::NEG_ONE,
1190                    f16::ZERO,
1191                    f16::NEG_ZERO,
1192                    f16::E,
1193                    f16::PI,
1194                    f16::FRAC_1_PI,
1195                ])),
1196                Arc::new(Float16Array::from(vec![
1197                    None,
1198                    None,
1199                    None,
1200                    Some(f16::NAN),
1201                    Some(f16::INFINITY),
1202                    Some(f16::NEG_INFINITY),
1203                    None,
1204                    None,
1205                    None,
1206                    None,
1207                    None,
1208                    None,
1209                    Some(f16::FRAC_1_PI),
1210                ])),
1211            ],
1212        )?;
1213
1214        writer.write(&original)?;
1215        writer.close()?;
1216
1217        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1218        let ret = reader.next().unwrap()?;
1219        assert_eq!(ret, original);
1220
1221        // Ensure can be downcast to the correct type
1222        ret.column(0).as_primitive::<Float16Type>();
1223        ret.column(1).as_primitive::<Float16Type>();
1224
1225        Ok(())
1226    }
1227
1228    #[test]
1229    fn test_time_utc_roundtrip() -> Result<()> {
1230        let schema = Arc::new(Schema::new(vec![
1231            Field::new(
1232                "time_millis",
1233                ArrowDataType::Time32(TimeUnit::Millisecond),
1234                true,
1235            )
1236            .with_metadata(HashMap::from_iter(vec![(
1237                "adjusted_to_utc".to_string(),
1238                "".to_string(),
1239            )])),
1240            Field::new(
1241                "time_micros",
1242                ArrowDataType::Time64(TimeUnit::Microsecond),
1243                true,
1244            )
1245            .with_metadata(HashMap::from_iter(vec![(
1246                "adjusted_to_utc".to_string(),
1247                "".to_string(),
1248            )])),
1249        ]));
1250
1251        let mut buf = Vec::with_capacity(1024);
1252        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1253
1254        let original = RecordBatch::try_new(
1255            schema,
1256            vec![
1257                Arc::new(Time32MillisecondArray::from(vec![
1258                    Some(-1),
1259                    Some(0),
1260                    Some(86_399_000),
1261                    Some(86_400_000),
1262                    Some(86_401_000),
1263                    None,
1264                ])),
1265                Arc::new(Time64MicrosecondArray::from(vec![
1266                    Some(-1),
1267                    Some(0),
1268                    Some(86_399 * 1_000_000),
1269                    Some(86_400 * 1_000_000),
1270                    Some(86_401 * 1_000_000),
1271                    None,
1272                ])),
1273            ],
1274        )?;
1275
1276        writer.write(&original)?;
1277        writer.close()?;
1278
1279        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1280        let ret = reader.next().unwrap()?;
1281        assert_eq!(ret, original);
1282
1283        // Ensure can be downcast to the correct type
1284        ret.column(0).as_primitive::<Time32MillisecondType>();
1285        ret.column(1).as_primitive::<Time64MicrosecondType>();
1286
1287        Ok(())
1288    }
1289
1290    #[test]
1291    fn test_date32_roundtrip() -> Result<()> {
1292        use arrow_array::Date32Array;
1293
1294        let schema = Arc::new(Schema::new(vec![Field::new(
1295            "date32",
1296            ArrowDataType::Date32,
1297            false,
1298        )]));
1299
1300        let mut buf = Vec::with_capacity(1024);
1301
1302        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1303
1304        let original = RecordBatch::try_new(
1305            schema,
1306            vec![Arc::new(Date32Array::from(vec![
1307                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1308            ]))],
1309        )?;
1310
1311        writer.write(&original)?;
1312        writer.close()?;
1313
1314        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1315        let ret = reader.next().unwrap()?;
1316        assert_eq!(ret, original);
1317
1318        // Ensure can be downcast to the correct type
1319        ret.column(0).as_primitive::<Date32Type>();
1320
1321        Ok(())
1322    }
1323
1324    #[test]
1325    fn test_date64_roundtrip() -> Result<()> {
1326        use arrow_array::Date64Array;
1327
1328        let schema = Arc::new(Schema::new(vec![
1329            Field::new("small-date64", ArrowDataType::Date64, false),
1330            Field::new("big-date64", ArrowDataType::Date64, false),
1331            Field::new("invalid-date64", ArrowDataType::Date64, false),
1332        ]));
1333
1334        let mut default_buf = Vec::with_capacity(1024);
1335        let mut coerce_buf = Vec::with_capacity(1024);
1336
1337        let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1338
1339        let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1340        let mut coerce_writer =
1341            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1342
1343        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1344
1345        let original = RecordBatch::try_new(
1346            schema,
1347            vec![
1348                // small-date64
1349                Arc::new(Date64Array::from(vec![
1350                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1351                    -1_000 * NUM_MILLISECONDS_IN_DAY,
1352                    0,
1353                    1_000 * NUM_MILLISECONDS_IN_DAY,
1354                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
1355                ])),
1356                // big-date64
1357                Arc::new(Date64Array::from(vec![
1358                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1359                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1360                    0,
1361                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1362                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1363                ])),
1364                // invalid-date64
1365                Arc::new(Date64Array::from(vec![
1366                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1367                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1368                    1,
1369                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1370                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1371                ])),
1372            ],
1373        )?;
1374
1375        default_writer.write(&original)?;
1376        coerce_writer.write(&original)?;
1377
1378        default_writer.close()?;
1379        coerce_writer.close()?;
1380
1381        let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1382        let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1383
1384        let default_ret = default_reader.next().unwrap()?;
1385        let coerce_ret = coerce_reader.next().unwrap()?;
1386
1387        // Roundtrip should be successful when default writer used
1388        assert_eq!(default_ret, original);
1389
1390        // Only small-date64 should roundtrip successfully when coerce_types writer is used
1391        assert_eq!(coerce_ret.column(0), original.column(0));
1392        assert_ne!(coerce_ret.column(1), original.column(1));
1393        assert_ne!(coerce_ret.column(2), original.column(2));
1394
1395        // Ensure both can be downcast to the correct type
1396        default_ret.column(0).as_primitive::<Date64Type>();
1397        coerce_ret.column(0).as_primitive::<Date64Type>();
1398
1399        Ok(())
1400    }
1401    struct RandFixedLenGen {}
1402
1403    impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1404        fn gen(len: i32) -> FixedLenByteArray {
1405            let mut v = vec![0u8; len as usize];
1406            thread_rng().fill_bytes(&mut v);
1407            ByteArray::from(v).into()
1408        }
1409    }
1410
1411    #[test]
1412    fn test_fixed_length_binary_column_reader() {
1413        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1414            20,
1415            ConvertedType::NONE,
1416            None,
1417            |vals| {
1418                let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1419                for val in vals {
1420                    match val {
1421                        Some(b) => builder.append_value(b).unwrap(),
1422                        None => builder.append_null(),
1423                    }
1424                }
1425                Arc::new(builder.finish())
1426            },
1427            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1428        );
1429    }
1430
1431    #[test]
1432    fn test_interval_day_time_column_reader() {
1433        run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1434            12,
1435            ConvertedType::INTERVAL,
1436            None,
1437            |vals| {
1438                Arc::new(
1439                    vals.iter()
1440                        .map(|x| {
1441                            x.as_ref().map(|b| IntervalDayTime {
1442                                days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1443                                milliseconds: i32::from_le_bytes(
1444                                    b.as_ref()[8..12].try_into().unwrap(),
1445                                ),
1446                            })
1447                        })
1448                        .collect::<IntervalDayTimeArray>(),
1449                )
1450            },
1451            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1452        );
1453    }
1454
1455    #[test]
1456    fn test_int96_single_column_reader_test() {
1457        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1458        run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1459            2,
1460            ConvertedType::NONE,
1461            None,
1462            |vals| {
1463                Arc::new(TimestampNanosecondArray::from_iter(
1464                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
1465                )) as _
1466            },
1467            encodings,
1468        );
1469    }
1470
1471    struct RandUtf8Gen {}
1472
1473    impl RandGen<ByteArrayType> for RandUtf8Gen {
1474        fn gen(len: i32) -> ByteArray {
1475            Int32Type::gen(len).to_string().as_str().into()
1476        }
1477    }
1478
1479    #[test]
1480    fn test_utf8_single_column_reader_test() {
1481        fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1482            Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1483                x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1484            })))
1485        }
1486
1487        let encodings = &[
1488            Encoding::PLAIN,
1489            Encoding::RLE_DICTIONARY,
1490            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1491            Encoding::DELTA_BYTE_ARRAY,
1492        ];
1493
1494        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1495            2,
1496            ConvertedType::NONE,
1497            None,
1498            |vals| {
1499                Arc::new(BinaryArray::from_iter(
1500                    vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1501                ))
1502            },
1503            encodings,
1504        );
1505
1506        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1507            2,
1508            ConvertedType::UTF8,
1509            None,
1510            string_converter::<i32>,
1511            encodings,
1512        );
1513
1514        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1515            2,
1516            ConvertedType::UTF8,
1517            Some(ArrowDataType::Utf8),
1518            string_converter::<i32>,
1519            encodings,
1520        );
1521
1522        run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1523            2,
1524            ConvertedType::UTF8,
1525            Some(ArrowDataType::LargeUtf8),
1526            string_converter::<i64>,
1527            encodings,
1528        );
1529
1530        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1531        for key in &small_key_types {
1532            for encoding in encodings {
1533                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1534                opts.encoding = *encoding;
1535
1536                let data_type =
1537                    ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1538
1539                // Cannot run full test suite as keys overflow, run small test instead
1540                single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1541                    opts,
1542                    2,
1543                    ConvertedType::UTF8,
1544                    Some(data_type.clone()),
1545                    move |vals| {
1546                        let vals = string_converter::<i32>(vals);
1547                        arrow::compute::cast(&vals, &data_type).unwrap()
1548                    },
1549                );
1550            }
1551        }
1552
1553        let key_types = [
1554            ArrowDataType::Int16,
1555            ArrowDataType::UInt16,
1556            ArrowDataType::Int32,
1557            ArrowDataType::UInt32,
1558            ArrowDataType::Int64,
1559            ArrowDataType::UInt64,
1560        ];
1561
1562        for key in &key_types {
1563            let data_type =
1564                ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1565
1566            run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1567                2,
1568                ConvertedType::UTF8,
1569                Some(data_type.clone()),
1570                move |vals| {
1571                    let vals = string_converter::<i32>(vals);
1572                    arrow::compute::cast(&vals, &data_type).unwrap()
1573                },
1574                encodings,
1575            );
1576
1577            // https://github.com/apache/arrow-rs/issues/1179
1578            // let data_type = ArrowDataType::Dictionary(
1579            //     Box::new(key.clone()),
1580            //     Box::new(ArrowDataType::LargeUtf8),
1581            // );
1582            //
1583            // run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1584            //     2,
1585            //     ConvertedType::UTF8,
1586            //     Some(data_type.clone()),
1587            //     move |vals| {
1588            //         let vals = string_converter::<i64>(vals);
1589            //         arrow::compute::cast(&vals, &data_type).unwrap()
1590            //     },
1591            //     encodings,
1592            // );
1593        }
1594    }
1595
1596    #[test]
1597    fn test_decimal_nullable_struct() {
1598        let decimals = Decimal256Array::from_iter_values(
1599            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1600        );
1601
1602        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1603            "decimals",
1604            decimals.data_type().clone(),
1605            false,
1606        )])))
1607        .len(8)
1608        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1609        .child_data(vec![decimals.into_data()])
1610        .build()
1611        .unwrap();
1612
1613        let written =
1614            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1615                .unwrap();
1616
1617        let mut buffer = Vec::with_capacity(1024);
1618        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1619        writer.write(&written).unwrap();
1620        writer.close().unwrap();
1621
1622        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1623            .unwrap()
1624            .collect::<Result<Vec<_>, _>>()
1625            .unwrap();
1626
1627        assert_eq!(&written.slice(0, 3), &read[0]);
1628        assert_eq!(&written.slice(3, 3), &read[1]);
1629        assert_eq!(&written.slice(6, 2), &read[2]);
1630    }
1631
1632    #[test]
1633    fn test_int32_nullable_struct() {
1634        let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1635        let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1636            "int32",
1637            int32.data_type().clone(),
1638            false,
1639        )])))
1640        .len(8)
1641        .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1642        .child_data(vec![int32.into_data()])
1643        .build()
1644        .unwrap();
1645
1646        let written =
1647            RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1648                .unwrap();
1649
1650        let mut buffer = Vec::with_capacity(1024);
1651        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1652        writer.write(&written).unwrap();
1653        writer.close().unwrap();
1654
1655        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1656            .unwrap()
1657            .collect::<Result<Vec<_>, _>>()
1658            .unwrap();
1659
1660        assert_eq!(&written.slice(0, 3), &read[0]);
1661        assert_eq!(&written.slice(3, 3), &read[1]);
1662        assert_eq!(&written.slice(6, 2), &read[2]);
1663    }
1664
1665    #[test]
1666    #[ignore] // https://github.com/apache/arrow-rs/issues/2253
1667    fn test_decimal_list() {
1668        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1669
1670        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
1671        let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1672            decimals.data_type().clone(),
1673            false,
1674        ))))
1675        .len(7)
1676        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1677        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1678        .child_data(vec![decimals.into_data()])
1679        .build()
1680        .unwrap();
1681
1682        let written =
1683            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1684                .unwrap();
1685
1686        let mut buffer = Vec::with_capacity(1024);
1687        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1688        writer.write(&written).unwrap();
1689        writer.close().unwrap();
1690
1691        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1692            .unwrap()
1693            .collect::<Result<Vec<_>, _>>()
1694            .unwrap();
1695
1696        assert_eq!(&written.slice(0, 3), &read[0]);
1697        assert_eq!(&written.slice(3, 3), &read[1]);
1698        assert_eq!(&written.slice(6, 1), &read[2]);
1699    }
1700
1701    #[test]
1702    fn test_read_decimal_file() {
1703        use arrow_array::Decimal128Array;
1704        let testdata = arrow::util::test_util::parquet_test_data();
1705        let file_variants = vec![
1706            ("byte_array", 4),
1707            ("fixed_length", 25),
1708            ("int32", 4),
1709            ("int64", 10),
1710        ];
1711        for (prefix, target_precision) in file_variants {
1712            let path = format!("{testdata}/{prefix}_decimal.parquet");
1713            let file = File::open(path).unwrap();
1714            let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1715
1716            let batch = record_reader.next().unwrap().unwrap();
1717            assert_eq!(batch.num_rows(), 24);
1718            let col = batch
1719                .column(0)
1720                .as_any()
1721                .downcast_ref::<Decimal128Array>()
1722                .unwrap();
1723
1724            let expected = 1..25;
1725
1726            assert_eq!(col.precision(), target_precision);
1727            assert_eq!(col.scale(), 2);
1728
1729            for (i, v) in expected.enumerate() {
1730                assert_eq!(col.value(i), v * 100_i128);
1731            }
1732        }
1733    }
1734
1735    #[test]
1736    fn test_read_float16_nonzeros_file() {
1737        use arrow_array::Float16Array;
1738        let testdata = arrow::util::test_util::parquet_test_data();
1739        // see https://github.com/apache/parquet-testing/pull/40
1740        let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1741        let file = File::open(path).unwrap();
1742        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1743
1744        let batch = record_reader.next().unwrap().unwrap();
1745        assert_eq!(batch.num_rows(), 8);
1746        let col = batch
1747            .column(0)
1748            .as_any()
1749            .downcast_ref::<Float16Array>()
1750            .unwrap();
1751
1752        let f16_two = f16::ONE + f16::ONE;
1753
1754        assert_eq!(col.null_count(), 1);
1755        assert!(col.is_null(0));
1756        assert_eq!(col.value(1), f16::ONE);
1757        assert_eq!(col.value(2), -f16_two);
1758        assert!(col.value(3).is_nan());
1759        assert_eq!(col.value(4), f16::ZERO);
1760        assert!(col.value(4).is_sign_positive());
1761        assert_eq!(col.value(5), f16::NEG_ONE);
1762        assert_eq!(col.value(6), f16::NEG_ZERO);
1763        assert!(col.value(6).is_sign_negative());
1764        assert_eq!(col.value(7), f16_two);
1765    }
1766
1767    #[test]
1768    fn test_read_float16_zeros_file() {
1769        use arrow_array::Float16Array;
1770        let testdata = arrow::util::test_util::parquet_test_data();
1771        // see https://github.com/apache/parquet-testing/pull/40
1772        let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1773        let file = File::open(path).unwrap();
1774        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1775
1776        let batch = record_reader.next().unwrap().unwrap();
1777        assert_eq!(batch.num_rows(), 3);
1778        let col = batch
1779            .column(0)
1780            .as_any()
1781            .downcast_ref::<Float16Array>()
1782            .unwrap();
1783
1784        assert_eq!(col.null_count(), 1);
1785        assert!(col.is_null(0));
1786        assert_eq!(col.value(1), f16::ZERO);
1787        assert!(col.value(1).is_sign_positive());
1788        assert!(col.value(2).is_nan());
1789    }
1790
1791    #[test]
1792    fn test_read_float32_float64_byte_stream_split() {
1793        let path = format!(
1794            "{}/byte_stream_split.zstd.parquet",
1795            arrow::util::test_util::parquet_test_data(),
1796        );
1797        let file = File::open(path).unwrap();
1798        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1799
1800        let mut row_count = 0;
1801        for batch in record_reader {
1802            let batch = batch.unwrap();
1803            row_count += batch.num_rows();
1804            let f32_col = batch.column(0).as_primitive::<Float32Type>();
1805            let f64_col = batch.column(1).as_primitive::<Float64Type>();
1806
1807            // This file contains floats from a standard normal distribution
1808            for &x in f32_col.values() {
1809                assert!(x > -10.0);
1810                assert!(x < 10.0);
1811            }
1812            for &x in f64_col.values() {
1813                assert!(x > -10.0);
1814                assert!(x < 10.0);
1815            }
1816        }
1817        assert_eq!(row_count, 300);
1818    }
1819
1820    #[test]
1821    fn test_read_extended_byte_stream_split() {
1822        let path = format!(
1823            "{}/byte_stream_split_extended.gzip.parquet",
1824            arrow::util::test_util::parquet_test_data(),
1825        );
1826        let file = File::open(path).unwrap();
1827        let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1828
1829        let mut row_count = 0;
1830        for batch in record_reader {
1831            let batch = batch.unwrap();
1832            row_count += batch.num_rows();
1833
1834            // 0,1 are f16
1835            let f16_col = batch.column(0).as_primitive::<Float16Type>();
1836            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1837            assert_eq!(f16_col.len(), f16_bss.len());
1838            f16_col
1839                .iter()
1840                .zip(f16_bss.iter())
1841                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1842
1843            // 2,3 are f32
1844            let f32_col = batch.column(2).as_primitive::<Float32Type>();
1845            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1846            assert_eq!(f32_col.len(), f32_bss.len());
1847            f32_col
1848                .iter()
1849                .zip(f32_bss.iter())
1850                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1851
1852            // 4,5 are f64
1853            let f64_col = batch.column(4).as_primitive::<Float64Type>();
1854            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1855            assert_eq!(f64_col.len(), f64_bss.len());
1856            f64_col
1857                .iter()
1858                .zip(f64_bss.iter())
1859                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1860
1861            // 6,7 are i32
1862            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1863            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1864            assert_eq!(i32_col.len(), i32_bss.len());
1865            i32_col
1866                .iter()
1867                .zip(i32_bss.iter())
1868                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1869
1870            // 8,9 are i64
1871            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1872            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1873            assert_eq!(i64_col.len(), i64_bss.len());
1874            i64_col
1875                .iter()
1876                .zip(i64_bss.iter())
1877                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1878
1879            // 10,11 are FLBA(5)
1880            let flba_col = batch.column(10).as_fixed_size_binary();
1881            let flba_bss = batch.column(11).as_fixed_size_binary();
1882            assert_eq!(flba_col.len(), flba_bss.len());
1883            flba_col
1884                .iter()
1885                .zip(flba_bss.iter())
1886                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1887
1888            // 12,13 are FLBA(4) (decimal(7,3))
1889            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1890            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1891            assert_eq!(dec_col.len(), dec_bss.len());
1892            dec_col
1893                .iter()
1894                .zip(dec_bss.iter())
1895                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1896        }
1897        assert_eq!(row_count, 200);
1898    }
1899
1900    #[test]
1901    fn test_read_incorrect_map_schema_file() {
1902        let testdata = arrow::util::test_util::parquet_test_data();
1903        // see https://github.com/apache/parquet-testing/pull/47
1904        let path = format!("{testdata}/incorrect_map_schema.parquet");
1905        let file = File::open(path).unwrap();
1906        let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1907
1908        let batch = record_reader.next().unwrap().unwrap();
1909        assert_eq!(batch.num_rows(), 1);
1910
1911        let expected_schema = Schema::new(Fields::from(vec![Field::new(
1912            "my_map",
1913            ArrowDataType::Map(
1914                Arc::new(Field::new(
1915                    "key_value",
1916                    ArrowDataType::Struct(Fields::from(vec![
1917                        Field::new("key", ArrowDataType::Utf8, false),
1918                        Field::new("value", ArrowDataType::Utf8, true),
1919                    ])),
1920                    false,
1921                )),
1922                false,
1923            ),
1924            true,
1925        )]));
1926        assert_eq!(batch.schema().as_ref(), &expected_schema);
1927
1928        assert_eq!(batch.num_rows(), 1);
1929        assert_eq!(batch.column(0).null_count(), 0);
1930        assert_eq!(
1931            batch.column(0).as_map().keys().as_ref(),
1932            &StringArray::from(vec!["parent", "name"])
1933        );
1934        assert_eq!(
1935            batch.column(0).as_map().values().as_ref(),
1936            &StringArray::from(vec!["another", "report"])
1937        );
1938    }
1939
1940    /// Parameters for single_column_reader_test
1941    #[derive(Clone)]
1942    struct TestOptions {
1943        /// Number of row group to write to parquet (row group size =
1944        /// num_row_groups / num_rows)
1945        num_row_groups: usize,
1946        /// Total number of rows per row group
1947        num_rows: usize,
1948        /// Size of batches to read back
1949        record_batch_size: usize,
1950        /// Percentage of nulls in column or None if required
1951        null_percent: Option<usize>,
1952        /// Set write batch size
1953        ///
1954        /// This is the number of rows that are written at once to a page and
1955        /// therefore acts as a bound on the page granularity of a row group
1956        write_batch_size: usize,
1957        /// Maximum size of page in bytes
1958        max_data_page_size: usize,
1959        /// Maximum size of dictionary page in bytes
1960        max_dict_page_size: usize,
1961        /// Writer version
1962        writer_version: WriterVersion,
1963        /// Enabled statistics
1964        enabled_statistics: EnabledStatistics,
1965        /// Encoding
1966        encoding: Encoding,
1967        /// row selections and total selected row count
1968        row_selections: Option<(RowSelection, usize)>,
1969        /// row filter
1970        row_filter: Option<Vec<bool>>,
1971        /// limit
1972        limit: Option<usize>,
1973        /// offset
1974        offset: Option<usize>,
1975    }
1976
1977    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
1978    impl std::fmt::Debug for TestOptions {
1979        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1980            f.debug_struct("TestOptions")
1981                .field("num_row_groups", &self.num_row_groups)
1982                .field("num_rows", &self.num_rows)
1983                .field("record_batch_size", &self.record_batch_size)
1984                .field("null_percent", &self.null_percent)
1985                .field("write_batch_size", &self.write_batch_size)
1986                .field("max_data_page_size", &self.max_data_page_size)
1987                .field("max_dict_page_size", &self.max_dict_page_size)
1988                .field("writer_version", &self.writer_version)
1989                .field("enabled_statistics", &self.enabled_statistics)
1990                .field("encoding", &self.encoding)
1991                .field("row_selections", &self.row_selections.is_some())
1992                .field("row_filter", &self.row_filter.is_some())
1993                .field("limit", &self.limit)
1994                .field("offset", &self.offset)
1995                .finish()
1996        }
1997    }
1998
1999    impl Default for TestOptions {
2000        fn default() -> Self {
2001            Self {
2002                num_row_groups: 2,
2003                num_rows: 100,
2004                record_batch_size: 15,
2005                null_percent: None,
2006                write_batch_size: 64,
2007                max_data_page_size: 1024 * 1024,
2008                max_dict_page_size: 1024 * 1024,
2009                writer_version: WriterVersion::PARQUET_1_0,
2010                enabled_statistics: EnabledStatistics::Page,
2011                encoding: Encoding::PLAIN,
2012                row_selections: None,
2013                row_filter: None,
2014                limit: None,
2015                offset: None,
2016            }
2017        }
2018    }
2019
2020    impl TestOptions {
2021        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2022            Self {
2023                num_row_groups,
2024                num_rows,
2025                record_batch_size,
2026                ..Default::default()
2027            }
2028        }
2029
2030        fn with_null_percent(self, null_percent: usize) -> Self {
2031            Self {
2032                null_percent: Some(null_percent),
2033                ..self
2034            }
2035        }
2036
2037        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2038            Self {
2039                max_data_page_size,
2040                ..self
2041            }
2042        }
2043
2044        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2045            Self {
2046                max_dict_page_size,
2047                ..self
2048            }
2049        }
2050
2051        fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2052            Self {
2053                enabled_statistics,
2054                ..self
2055            }
2056        }
2057
2058        fn with_row_selections(self) -> Self {
2059            assert!(self.row_filter.is_none(), "Must set row selection first");
2060
2061            let mut rng = thread_rng();
2062            let step = rng.gen_range(self.record_batch_size..self.num_rows);
2063            let row_selections =
2064                create_test_selection(step, self.num_row_groups * self.num_rows, rng.gen::<bool>());
2065            Self {
2066                row_selections: Some(row_selections),
2067                ..self
2068            }
2069        }
2070
2071        fn with_row_filter(self) -> Self {
2072            let row_count = match &self.row_selections {
2073                Some((_, count)) => *count,
2074                None => self.num_row_groups * self.num_rows,
2075            };
2076
2077            let mut rng = thread_rng();
2078            Self {
2079                row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
2080                ..self
2081            }
2082        }
2083
2084        fn with_limit(self, limit: usize) -> Self {
2085            Self {
2086                limit: Some(limit),
2087                ..self
2088            }
2089        }
2090
2091        fn with_offset(self, offset: usize) -> Self {
2092            Self {
2093                offset: Some(offset),
2094                ..self
2095            }
2096        }
2097
2098        fn writer_props(&self) -> WriterProperties {
2099            let builder = WriterProperties::builder()
2100                .set_data_page_size_limit(self.max_data_page_size)
2101                .set_write_batch_size(self.write_batch_size)
2102                .set_writer_version(self.writer_version)
2103                .set_statistics_enabled(self.enabled_statistics);
2104
2105            let builder = match self.encoding {
2106                Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2107                    .set_dictionary_enabled(true)
2108                    .set_dictionary_page_size_limit(self.max_dict_page_size),
2109                _ => builder
2110                    .set_dictionary_enabled(false)
2111                    .set_encoding(self.encoding),
2112            };
2113
2114            builder.build()
2115        }
2116    }
2117
2118    /// Create a parquet file and then read it using
2119    /// `ParquetFileArrowReader` using a standard set of parameters
2120    /// `opts`.
2121    ///
2122    /// `rand_max` represents the maximum size of value to pass to to
2123    /// value generator
2124    fn run_single_column_reader_tests<T, F, G>(
2125        rand_max: i32,
2126        converted_type: ConvertedType,
2127        arrow_type: Option<ArrowDataType>,
2128        converter: F,
2129        encodings: &[Encoding],
2130    ) where
2131        T: DataType,
2132        G: RandGen<T>,
2133        F: Fn(&[Option<T::T>]) -> ArrayRef,
2134    {
2135        let all_options = vec![
2136            // choose record_batch_batch (15) so batches cross row
2137            // group boundaries (50 rows in 2 row groups) cases.
2138            TestOptions::new(2, 100, 15),
2139            // choose record_batch_batch (5) so batches sometime fall
2140            // on row group boundaries and (25 rows in 3 row groups
2141            // --> row groups of 10, 10, and 5). Tests buffer
2142            // refilling edge cases.
2143            TestOptions::new(3, 25, 5),
2144            // Choose record_batch_size (25) so all batches fall
2145            // exactly on row group boundary (25). Tests buffer
2146            // refilling edge cases.
2147            TestOptions::new(4, 100, 25),
2148            // Set maximum page size so row groups have multiple pages
2149            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2150            // Set small dictionary page size to test dictionary fallback
2151            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2152            // Test optional but with no nulls
2153            TestOptions::new(2, 256, 127).with_null_percent(0),
2154            // Test optional with nulls
2155            TestOptions::new(2, 256, 93).with_null_percent(25),
2156            // Test with limit of 0
2157            TestOptions::new(4, 100, 25).with_limit(0),
2158            // Test with limit of 50
2159            TestOptions::new(4, 100, 25).with_limit(50),
2160            // Test with limit equal to number of rows
2161            TestOptions::new(4, 100, 25).with_limit(10),
2162            // Test with limit larger than number of rows
2163            TestOptions::new(4, 100, 25).with_limit(101),
2164            // Test with limit + offset equal to number of rows
2165            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2166            // Test with limit + offset equal to number of rows
2167            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2168            // Test with limit + offset larger than number of rows
2169            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2170            // Test with no page-level statistics
2171            TestOptions::new(2, 256, 91)
2172                .with_null_percent(25)
2173                .with_enabled_statistics(EnabledStatistics::Chunk),
2174            // Test with no statistics
2175            TestOptions::new(2, 256, 91)
2176                .with_null_percent(25)
2177                .with_enabled_statistics(EnabledStatistics::None),
2178            // Test with all null
2179            TestOptions::new(2, 128, 91)
2180                .with_null_percent(100)
2181                .with_enabled_statistics(EnabledStatistics::None),
2182            // Test skip
2183
2184            // choose record_batch_batch (15) so batches cross row
2185            // group boundaries (50 rows in 2 row groups) cases.
2186            TestOptions::new(2, 100, 15).with_row_selections(),
2187            // choose record_batch_batch (5) so batches sometime fall
2188            // on row group boundaries and (25 rows in 3 row groups
2189            // --> row groups of 10, 10, and 5). Tests buffer
2190            // refilling edge cases.
2191            TestOptions::new(3, 25, 5).with_row_selections(),
2192            // Choose record_batch_size (25) so all batches fall
2193            // exactly on row group boundary (25). Tests buffer
2194            // refilling edge cases.
2195            TestOptions::new(4, 100, 25).with_row_selections(),
2196            // Set maximum page size so row groups have multiple pages
2197            TestOptions::new(3, 256, 73)
2198                .with_max_data_page_size(128)
2199                .with_row_selections(),
2200            // Set small dictionary page size to test dictionary fallback
2201            TestOptions::new(3, 256, 57)
2202                .with_max_dict_page_size(128)
2203                .with_row_selections(),
2204            // Test optional but with no nulls
2205            TestOptions::new(2, 256, 127)
2206                .with_null_percent(0)
2207                .with_row_selections(),
2208            // Test optional with nulls
2209            TestOptions::new(2, 256, 93)
2210                .with_null_percent(25)
2211                .with_row_selections(),
2212            // Test optional with nulls
2213            TestOptions::new(2, 256, 93)
2214                .with_null_percent(25)
2215                .with_row_selections()
2216                .with_limit(10),
2217            // Test optional with nulls
2218            TestOptions::new(2, 256, 93)
2219                .with_null_percent(25)
2220                .with_row_selections()
2221                .with_offset(20)
2222                .with_limit(10),
2223            // Test filter
2224
2225            // Test with row filter
2226            TestOptions::new(4, 100, 25).with_row_filter(),
2227            // Test with row selection and row filter
2228            TestOptions::new(4, 100, 25)
2229                .with_row_selections()
2230                .with_row_filter(),
2231            // Test with nulls and row filter
2232            TestOptions::new(2, 256, 93)
2233                .with_null_percent(25)
2234                .with_max_data_page_size(10)
2235                .with_row_filter(),
2236            // Test with nulls and row filter and small pages
2237            TestOptions::new(2, 256, 93)
2238                .with_null_percent(25)
2239                .with_max_data_page_size(10)
2240                .with_row_selections()
2241                .with_row_filter(),
2242            // Test with row selection and no offset index and small pages
2243            TestOptions::new(2, 256, 93)
2244                .with_enabled_statistics(EnabledStatistics::None)
2245                .with_max_data_page_size(10)
2246                .with_row_selections(),
2247        ];
2248
2249        all_options.into_iter().for_each(|opts| {
2250            for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2251                for encoding in encodings {
2252                    let opts = TestOptions {
2253                        writer_version,
2254                        encoding: *encoding,
2255                        ..opts.clone()
2256                    };
2257
2258                    single_column_reader_test::<T, _, G>(
2259                        opts,
2260                        rand_max,
2261                        converted_type,
2262                        arrow_type.clone(),
2263                        &converter,
2264                    )
2265                }
2266            }
2267        });
2268    }
2269
2270    /// Create a parquet file and then read it using
2271    /// `ParquetFileArrowReader` using the parameters described in
2272    /// `opts`.
2273    fn single_column_reader_test<T, F, G>(
2274        opts: TestOptions,
2275        rand_max: i32,
2276        converted_type: ConvertedType,
2277        arrow_type: Option<ArrowDataType>,
2278        converter: F,
2279    ) where
2280        T: DataType,
2281        G: RandGen<T>,
2282        F: Fn(&[Option<T::T>]) -> ArrayRef,
2283    {
2284        // Print out options to facilitate debugging failures on CI
2285        println!(
2286            "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2287            T::get_physical_type(), converted_type, arrow_type, opts
2288        );
2289
2290        //according to null_percent generate def_levels
2291        let (repetition, def_levels) = match opts.null_percent.as_ref() {
2292            Some(null_percent) => {
2293                let mut rng = thread_rng();
2294
2295                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2296                    .map(|_| {
2297                        std::iter::from_fn(|| {
2298                            Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2299                        })
2300                        .take(opts.num_rows)
2301                        .collect()
2302                    })
2303                    .collect();
2304                (Repetition::OPTIONAL, Some(def_levels))
2305            }
2306            None => (Repetition::REQUIRED, None),
2307        };
2308
2309        //generate random table data
2310        let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2311            .map(|idx| {
2312                let null_count = match def_levels.as_ref() {
2313                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2314                    None => 0,
2315                };
2316                G::gen_vec(rand_max, opts.num_rows - null_count)
2317            })
2318            .collect();
2319
2320        let len = match T::get_physical_type() {
2321            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2322            crate::basic::Type::INT96 => 12,
2323            _ => -1,
2324        };
2325
2326        let fields = vec![Arc::new(
2327            Type::primitive_type_builder("leaf", T::get_physical_type())
2328                .with_repetition(repetition)
2329                .with_converted_type(converted_type)
2330                .with_length(len)
2331                .build()
2332                .unwrap(),
2333        )];
2334
2335        let schema = Arc::new(
2336            Type::group_type_builder("test_schema")
2337                .with_fields(fields)
2338                .build()
2339                .unwrap(),
2340        );
2341
2342        let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2343
2344        let mut file = tempfile::tempfile().unwrap();
2345
2346        generate_single_column_file_with_data::<T>(
2347            &values,
2348            def_levels.as_ref(),
2349            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2350            schema,
2351            arrow_field,
2352            &opts,
2353        )
2354        .unwrap();
2355
2356        file.rewind().unwrap();
2357
2358        let options = ArrowReaderOptions::new()
2359            .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2360
2361        let mut builder =
2362            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2363
2364        let expected_data = match opts.row_selections {
2365            Some((selections, row_count)) => {
2366                let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2367
2368                let mut skip_data: Vec<Option<T::T>> = vec![];
2369                let dequeue: VecDeque<RowSelector> = selections.clone().into();
2370                for select in dequeue {
2371                    if select.skip {
2372                        without_skip_data.drain(0..select.row_count);
2373                    } else {
2374                        skip_data.extend(without_skip_data.drain(0..select.row_count));
2375                    }
2376                }
2377                builder = builder.with_row_selection(selections);
2378
2379                assert_eq!(skip_data.len(), row_count);
2380                skip_data
2381            }
2382            None => {
2383                //get flatten table data
2384                let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2385                assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2386                expected_data
2387            }
2388        };
2389
2390        let mut expected_data = match opts.row_filter {
2391            Some(filter) => {
2392                let expected_data = expected_data
2393                    .into_iter()
2394                    .zip(filter.iter())
2395                    .filter_map(|(d, f)| f.then(|| d))
2396                    .collect();
2397
2398                let mut filter_offset = 0;
2399                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2400                    ProjectionMask::all(),
2401                    move |b| {
2402                        let array = BooleanArray::from_iter(
2403                            filter
2404                                .iter()
2405                                .skip(filter_offset)
2406                                .take(b.num_rows())
2407                                .map(|x| Some(*x)),
2408                        );
2409                        filter_offset += b.num_rows();
2410                        Ok(array)
2411                    },
2412                ))]);
2413
2414                builder = builder.with_row_filter(filter);
2415                expected_data
2416            }
2417            None => expected_data,
2418        };
2419
2420        if let Some(offset) = opts.offset {
2421            builder = builder.with_offset(offset);
2422            expected_data = expected_data.into_iter().skip(offset).collect();
2423        }
2424
2425        if let Some(limit) = opts.limit {
2426            builder = builder.with_limit(limit);
2427            expected_data = expected_data.into_iter().take(limit).collect();
2428        }
2429
2430        let mut record_reader = builder
2431            .with_batch_size(opts.record_batch_size)
2432            .build()
2433            .unwrap();
2434
2435        let mut total_read = 0;
2436        loop {
2437            let maybe_batch = record_reader.next();
2438            if total_read < expected_data.len() {
2439                let end = min(total_read + opts.record_batch_size, expected_data.len());
2440                let batch = maybe_batch.unwrap().unwrap();
2441                assert_eq!(end - total_read, batch.num_rows());
2442
2443                let a = converter(&expected_data[total_read..end]);
2444                let b = Arc::clone(batch.column(0));
2445
2446                assert_eq!(a.data_type(), b.data_type());
2447                assert_eq!(a.to_data(), b.to_data());
2448                assert_eq!(
2449                    a.as_any().type_id(),
2450                    b.as_any().type_id(),
2451                    "incorrect type ids"
2452                );
2453
2454                total_read = end;
2455            } else {
2456                assert!(maybe_batch.is_none());
2457                break;
2458            }
2459        }
2460    }
2461
2462    fn gen_expected_data<T: DataType>(
2463        def_levels: Option<&Vec<Vec<i16>>>,
2464        values: &[Vec<T::T>],
2465    ) -> Vec<Option<T::T>> {
2466        let data: Vec<Option<T::T>> = match def_levels {
2467            Some(levels) => {
2468                let mut values_iter = values.iter().flatten();
2469                levels
2470                    .iter()
2471                    .flatten()
2472                    .map(|d| match d {
2473                        1 => Some(values_iter.next().cloned().unwrap()),
2474                        0 => None,
2475                        _ => unreachable!(),
2476                    })
2477                    .collect()
2478            }
2479            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2480        };
2481        data
2482    }
2483
2484    fn generate_single_column_file_with_data<T: DataType>(
2485        values: &[Vec<T::T>],
2486        def_levels: Option<&Vec<Vec<i16>>>,
2487        file: File,
2488        schema: TypePtr,
2489        field: Option<Field>,
2490        opts: &TestOptions,
2491    ) -> Result<crate::format::FileMetaData> {
2492        let mut writer_props = opts.writer_props();
2493        if let Some(field) = field {
2494            let arrow_schema = Schema::new(vec![field]);
2495            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2496        }
2497
2498        let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2499
2500        for (idx, v) in values.iter().enumerate() {
2501            let def_levels = def_levels.map(|d| d[idx].as_slice());
2502            let mut row_group_writer = writer.next_row_group()?;
2503            {
2504                let mut column_writer = row_group_writer
2505                    .next_column()?
2506                    .expect("Column writer is none!");
2507
2508                column_writer
2509                    .typed::<T>()
2510                    .write_batch(v, def_levels, None)?;
2511
2512                column_writer.close()?;
2513            }
2514            row_group_writer.close()?;
2515        }
2516
2517        writer.close()
2518    }
2519
2520    fn get_test_file(file_name: &str) -> File {
2521        let mut path = PathBuf::new();
2522        path.push(arrow::util::test_util::arrow_test_data());
2523        path.push(file_name);
2524
2525        File::open(path.as_path()).expect("File not found!")
2526    }
2527
2528    #[test]
2529    fn test_read_structs() {
2530        // This particular test file has columns of struct types where there is
2531        // a column that has the same name as one of the struct fields
2532        // (see: ARROW-11452)
2533        let testdata = arrow::util::test_util::parquet_test_data();
2534        let path = format!("{testdata}/nested_structs.rust.parquet");
2535        let file = File::open(&path).unwrap();
2536        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2537
2538        for batch in record_batch_reader {
2539            batch.unwrap();
2540        }
2541
2542        let file = File::open(&path).unwrap();
2543        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2544
2545        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2546        let projected_reader = builder
2547            .with_projection(mask)
2548            .with_batch_size(60)
2549            .build()
2550            .unwrap();
2551
2552        let expected_schema = Schema::new(vec![
2553            Field::new(
2554                "roll_num",
2555                ArrowDataType::Struct(Fields::from(vec![Field::new(
2556                    "count",
2557                    ArrowDataType::UInt64,
2558                    false,
2559                )])),
2560                false,
2561            ),
2562            Field::new(
2563                "PC_CUR",
2564                ArrowDataType::Struct(Fields::from(vec![
2565                    Field::new("mean", ArrowDataType::Int64, false),
2566                    Field::new("sum", ArrowDataType::Int64, false),
2567                ])),
2568                false,
2569            ),
2570        ]);
2571
2572        // Tests for #1652 and #1654
2573        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2574
2575        for batch in projected_reader {
2576            let batch = batch.unwrap();
2577            assert_eq!(batch.schema().as_ref(), &expected_schema);
2578        }
2579    }
2580
2581    #[test]
2582    // same as test_read_structs but constructs projection mask via column names
2583    fn test_read_structs_by_name() {
2584        let testdata = arrow::util::test_util::parquet_test_data();
2585        let path = format!("{testdata}/nested_structs.rust.parquet");
2586        let file = File::open(&path).unwrap();
2587        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2588
2589        for batch in record_batch_reader {
2590            batch.unwrap();
2591        }
2592
2593        let file = File::open(&path).unwrap();
2594        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2595
2596        let mask = ProjectionMask::columns(
2597            builder.parquet_schema(),
2598            ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2599        );
2600        let projected_reader = builder
2601            .with_projection(mask)
2602            .with_batch_size(60)
2603            .build()
2604            .unwrap();
2605
2606        let expected_schema = Schema::new(vec![
2607            Field::new(
2608                "roll_num",
2609                ArrowDataType::Struct(Fields::from(vec![Field::new(
2610                    "count",
2611                    ArrowDataType::UInt64,
2612                    false,
2613                )])),
2614                false,
2615            ),
2616            Field::new(
2617                "PC_CUR",
2618                ArrowDataType::Struct(Fields::from(vec![
2619                    Field::new("mean", ArrowDataType::Int64, false),
2620                    Field::new("sum", ArrowDataType::Int64, false),
2621                ])),
2622                false,
2623            ),
2624        ]);
2625
2626        assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2627
2628        for batch in projected_reader {
2629            let batch = batch.unwrap();
2630            assert_eq!(batch.schema().as_ref(), &expected_schema);
2631        }
2632    }
2633
2634    #[test]
2635    fn test_read_maps() {
2636        let testdata = arrow::util::test_util::parquet_test_data();
2637        let path = format!("{testdata}/nested_maps.snappy.parquet");
2638        let file = File::open(path).unwrap();
2639        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2640
2641        for batch in record_batch_reader {
2642            batch.unwrap();
2643        }
2644    }
2645
2646    #[test]
2647    fn test_nested_nullability() {
2648        let message_type = "message nested {
2649          OPTIONAL Group group {
2650            REQUIRED INT32 leaf;
2651          }
2652        }";
2653
2654        let file = tempfile::tempfile().unwrap();
2655        let schema = Arc::new(parse_message_type(message_type).unwrap());
2656
2657        {
2658            // Write using low-level parquet API (#1167)
2659            let mut writer =
2660                SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2661                    .unwrap();
2662
2663            {
2664                let mut row_group_writer = writer.next_row_group().unwrap();
2665                let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2666
2667                column_writer
2668                    .typed::<Int32Type>()
2669                    .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2670                    .unwrap();
2671
2672                column_writer.close().unwrap();
2673                row_group_writer.close().unwrap();
2674            }
2675
2676            writer.close().unwrap();
2677        }
2678
2679        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2680        let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2681
2682        let reader = builder.with_projection(mask).build().unwrap();
2683
2684        let expected_schema = Schema::new(Fields::from(vec![Field::new(
2685            "group",
2686            ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2687            true,
2688        )]));
2689
2690        let batch = reader.into_iter().next().unwrap().unwrap();
2691        assert_eq!(batch.schema().as_ref(), &expected_schema);
2692        assert_eq!(batch.num_rows(), 4);
2693        assert_eq!(batch.column(0).null_count(), 2);
2694    }
2695
2696    #[test]
2697    fn test_invalid_utf8() {
2698        // a parquet file with 1 column with invalid utf8
2699        let data = vec![
2700            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2701            18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2702            3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2703            2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2704            111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2705            21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2706            38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2707            38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2708            0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2709            118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2710            116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2711            82, 49,
2712        ];
2713
2714        let file = Bytes::from(data);
2715        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2716
2717        let error = record_batch_reader.next().unwrap().unwrap_err();
2718
2719        assert!(
2720            error.to_string().contains("invalid utf-8 sequence"),
2721            "{}",
2722            error
2723        );
2724    }
2725
2726    #[test]
2727    fn test_invalid_utf8_string_array() {
2728        test_invalid_utf8_string_array_inner::<i32>();
2729    }
2730
2731    #[test]
2732    fn test_invalid_utf8_large_string_array() {
2733        test_invalid_utf8_string_array_inner::<i64>();
2734    }
2735
2736    fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2737        let cases = [
2738            invalid_utf8_first_char::<O>(),
2739            invalid_utf8_first_char_long_strings::<O>(),
2740            invalid_utf8_later_char::<O>(),
2741            invalid_utf8_later_char_long_strings::<O>(),
2742            invalid_utf8_later_char_really_long_strings::<O>(),
2743            invalid_utf8_later_char_really_long_strings2::<O>(),
2744        ];
2745        for array in &cases {
2746            for encoding in STRING_ENCODINGS {
2747                // data is not valid utf8 we can not construct a correct StringArray
2748                // safely, so purposely create an invalid StringArray
2749                let array = unsafe {
2750                    GenericStringArray::<O>::new_unchecked(
2751                        array.offsets().clone(),
2752                        array.values().clone(),
2753                        array.nulls().cloned(),
2754                    )
2755                };
2756                let data_type = array.data_type().clone();
2757                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2758                let err = read_from_parquet(data).unwrap_err();
2759                let expected_err =
2760                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2761                assert!(
2762                    err.to_string().contains(expected_err),
2763                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2764                );
2765            }
2766        }
2767    }
2768
2769    #[test]
2770    fn test_invalid_utf8_string_view_array() {
2771        let cases = [
2772            invalid_utf8_first_char::<i32>(),
2773            invalid_utf8_first_char_long_strings::<i32>(),
2774            invalid_utf8_later_char::<i32>(),
2775            invalid_utf8_later_char_long_strings::<i32>(),
2776            invalid_utf8_later_char_really_long_strings::<i32>(),
2777            invalid_utf8_later_char_really_long_strings2::<i32>(),
2778        ];
2779
2780        for encoding in STRING_ENCODINGS {
2781            for array in &cases {
2782                let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2783                let array = array.as_binary_view();
2784
2785                // data is not valid utf8 we can not construct a correct StringArray
2786                // safely, so purposely create an invalid StringViewArray
2787                let array = unsafe {
2788                    StringViewArray::new_unchecked(
2789                        array.views().clone(),
2790                        array.data_buffers().to_vec(),
2791                        array.nulls().cloned(),
2792                    )
2793                };
2794
2795                let data_type = array.data_type().clone();
2796                let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2797                let err = read_from_parquet(data).unwrap_err();
2798                let expected_err =
2799                    "Parquet argument error: Parquet error: encountered non UTF-8 data";
2800                assert!(
2801                    err.to_string().contains(expected_err),
2802                    "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2803                );
2804            }
2805        }
2806    }
2807
2808    /// Encodings suitable for string data
2809    const STRING_ENCODINGS: &[Option<Encoding>] = &[
2810        None,
2811        Some(Encoding::PLAIN),
2812        Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2813        Some(Encoding::DELTA_BYTE_ARRAY),
2814    ];
2815
2816    /// Invalid Utf-8 sequence in the first character
2817    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2818    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2819
2820    /// Invalid Utf=8 sequence in NOT the first character
2821    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2822    const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2823
2824    /// returns a BinaryArray with invalid UTF8 data in the first character
2825    fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2826        let valid: &[u8] = b"   ";
2827        let invalid = INVALID_UTF8_FIRST_CHAR;
2828        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2829    }
2830
2831    /// Returns a BinaryArray with invalid UTF8 data in the first character of a
2832    /// string larger than 12 bytes which is handled specially when reading
2833    /// `ByteViewArray`s
2834    fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2835        let valid: &[u8] = b"   ";
2836        let mut invalid = vec![];
2837        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2838        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2839        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2840    }
2841
2842    /// returns a BinaryArray with invalid UTF8 data in a character other than
2843    /// the first (this is checked in a special codepath)
2844    fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2845        let valid: &[u8] = b"   ";
2846        let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2847        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2848    }
2849
2850    /// returns a BinaryArray with invalid UTF8 data in a character other than
2851    /// the first in a string larger than 12 bytes which is handled specially
2852    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2853    fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2854        let valid: &[u8] = b"   ";
2855        let mut invalid = vec![];
2856        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2857        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2858        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2859    }
2860
2861    /// returns a BinaryArray with invalid UTF8 data in a character other than
2862    /// the first in a string larger than 128 bytes which is handled specially
2863    /// when reading `ByteViewArray`s (this is checked in a special codepath)
2864    fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2865        let valid: &[u8] = b"   ";
2866        let mut invalid = vec![];
2867        for _ in 0..10 {
2868            // each instance is 38 bytes
2869            invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2870        }
2871        invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2872        GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2873    }
2874
2875    /// returns a BinaryArray with small invalid UTF8 data followed by a large
2876    /// invalid UTF8 data in a character other than the first in a string larger
2877    fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2878        let valid: &[u8] = b"   ";
2879        let mut valid_long = vec![];
2880        for _ in 0..10 {
2881            // each instance is 38 bytes
2882            valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2883        }
2884        let invalid = INVALID_UTF8_LATER_CHAR;
2885        GenericBinaryArray::<O>::from_iter(vec![
2886            None,
2887            Some(valid),
2888            Some(invalid),
2889            None,
2890            Some(&valid_long),
2891            Some(valid),
2892        ])
2893    }
2894
2895    /// writes the array into a single column parquet file with the specified
2896    /// encoding.
2897    ///
2898    /// If no encoding is specified, use default (dictionary) encoding
2899    fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2900        let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2901        let mut data = vec![];
2902        let schema = batch.schema();
2903        let props = encoding.map(|encoding| {
2904            WriterProperties::builder()
2905                // must disable dictionary encoding to actually use encoding
2906                .set_dictionary_enabled(false)
2907                .set_encoding(encoding)
2908                .build()
2909        });
2910
2911        {
2912            let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
2913            writer.write(&batch).unwrap();
2914            writer.flush().unwrap();
2915            writer.close().unwrap();
2916        };
2917        data
2918    }
2919
2920    /// read the parquet file into a record batch
2921    fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
2922        let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
2923            .unwrap()
2924            .build()
2925            .unwrap();
2926
2927        reader.collect()
2928    }
2929
2930    #[test]
2931    fn test_dictionary_preservation() {
2932        let fields = vec![Arc::new(
2933            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
2934                .with_repetition(Repetition::OPTIONAL)
2935                .with_converted_type(ConvertedType::UTF8)
2936                .build()
2937                .unwrap(),
2938        )];
2939
2940        let schema = Arc::new(
2941            Type::group_type_builder("test_schema")
2942                .with_fields(fields)
2943                .build()
2944                .unwrap(),
2945        );
2946
2947        let dict_type = ArrowDataType::Dictionary(
2948            Box::new(ArrowDataType::Int32),
2949            Box::new(ArrowDataType::Utf8),
2950        );
2951
2952        let arrow_field = Field::new("leaf", dict_type, true);
2953
2954        let mut file = tempfile::tempfile().unwrap();
2955
2956        let values = vec![
2957            vec![
2958                ByteArray::from("hello"),
2959                ByteArray::from("a"),
2960                ByteArray::from("b"),
2961                ByteArray::from("d"),
2962            ],
2963            vec![
2964                ByteArray::from("c"),
2965                ByteArray::from("a"),
2966                ByteArray::from("b"),
2967            ],
2968        ];
2969
2970        let def_levels = vec![
2971            vec![1, 0, 0, 1, 0, 0, 1, 1],
2972            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
2973        ];
2974
2975        let opts = TestOptions {
2976            encoding: Encoding::RLE_DICTIONARY,
2977            ..Default::default()
2978        };
2979
2980        generate_single_column_file_with_data::<ByteArrayType>(
2981            &values,
2982            Some(&def_levels),
2983            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
2984            schema,
2985            Some(arrow_field),
2986            &opts,
2987        )
2988        .unwrap();
2989
2990        file.rewind().unwrap();
2991
2992        let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
2993
2994        let batches = record_reader
2995            .collect::<Result<Vec<RecordBatch>, _>>()
2996            .unwrap();
2997
2998        assert_eq!(batches.len(), 6);
2999        assert!(batches.iter().all(|x| x.num_columns() == 1));
3000
3001        let row_counts = batches
3002            .iter()
3003            .map(|x| (x.num_rows(), x.column(0).null_count()))
3004            .collect::<Vec<_>>();
3005
3006        assert_eq!(
3007            row_counts,
3008            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3009        );
3010
3011        let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3012
3013        // First and second batch in same row group -> same dictionary
3014        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3015        // Third batch spans row group -> computed dictionary
3016        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3017        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3018        // Fourth, fifth and sixth from same row group -> same dictionary
3019        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3020        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3021    }
3022
3023    #[test]
3024    fn test_read_null_list() {
3025        let testdata = arrow::util::test_util::parquet_test_data();
3026        let path = format!("{testdata}/null_list.parquet");
3027        let file = File::open(path).unwrap();
3028        let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3029
3030        let batch = record_batch_reader.next().unwrap().unwrap();
3031        assert_eq!(batch.num_rows(), 1);
3032        assert_eq!(batch.num_columns(), 1);
3033        assert_eq!(batch.column(0).len(), 1);
3034
3035        let list = batch
3036            .column(0)
3037            .as_any()
3038            .downcast_ref::<ListArray>()
3039            .unwrap();
3040        assert_eq!(list.len(), 1);
3041        assert!(list.is_valid(0));
3042
3043        let val = list.value(0);
3044        assert_eq!(val.len(), 0);
3045    }
3046
3047    #[test]
3048    fn test_null_schema_inference() {
3049        let testdata = arrow::util::test_util::parquet_test_data();
3050        let path = format!("{testdata}/null_list.parquet");
3051        let file = File::open(path).unwrap();
3052
3053        let arrow_field = Field::new(
3054            "emptylist",
3055            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3056            true,
3057        );
3058
3059        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3060        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3061        let schema = builder.schema();
3062        assert_eq!(schema.fields().len(), 1);
3063        assert_eq!(schema.field(0), &arrow_field);
3064    }
3065
3066    #[test]
3067    fn test_skip_metadata() {
3068        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3069        let field = Field::new("col", col.data_type().clone(), true);
3070
3071        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3072
3073        let metadata = [("key".to_string(), "value".to_string())]
3074            .into_iter()
3075            .collect();
3076
3077        let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3078
3079        assert_ne!(schema_with_metadata, schema_without_metadata);
3080
3081        let batch =
3082            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3083
3084        let file = |version: WriterVersion| {
3085            let props = WriterProperties::builder()
3086                .set_writer_version(version)
3087                .build();
3088
3089            let file = tempfile().unwrap();
3090            let mut writer =
3091                ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3092                    .unwrap();
3093            writer.write(&batch).unwrap();
3094            writer.close().unwrap();
3095            file
3096        };
3097
3098        let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3099
3100        let v1_reader = file(WriterVersion::PARQUET_1_0);
3101        let v2_reader = file(WriterVersion::PARQUET_2_0);
3102
3103        let arrow_reader =
3104            ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3105        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3106
3107        let reader =
3108            ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3109                .unwrap()
3110                .build()
3111                .unwrap();
3112        assert_eq!(reader.schema(), schema_without_metadata);
3113
3114        let arrow_reader =
3115            ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3116        assert_eq!(arrow_reader.schema(), schema_with_metadata);
3117
3118        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3119            .unwrap()
3120            .build()
3121            .unwrap();
3122        assert_eq!(reader.schema(), schema_without_metadata);
3123    }
3124
3125    fn write_parquet_from_iter<I, F>(value: I) -> File
3126    where
3127        I: IntoIterator<Item = (F, ArrayRef)>,
3128        F: AsRef<str>,
3129    {
3130        let batch = RecordBatch::try_from_iter(value).unwrap();
3131        let file = tempfile().unwrap();
3132        let mut writer =
3133            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3134        writer.write(&batch).unwrap();
3135        writer.close().unwrap();
3136        file
3137    }
3138
3139    fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3140    where
3141        I: IntoIterator<Item = (F, ArrayRef)>,
3142        F: AsRef<str>,
3143    {
3144        let file = write_parquet_from_iter(value);
3145        let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3146        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3147            file.try_clone().unwrap(),
3148            options_with_schema,
3149        );
3150        assert_eq!(builder.err().unwrap().to_string(), expected_error);
3151    }
3152
3153    #[test]
3154    fn test_schema_too_few_columns() {
3155        run_schema_test_with_error(
3156            vec![
3157                ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3158                ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3159            ],
3160            Arc::new(Schema::new(vec![Field::new(
3161                "int64",
3162                ArrowDataType::Int64,
3163                false,
3164            )])),
3165            "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3166        );
3167    }
3168
3169    #[test]
3170    fn test_schema_too_many_columns() {
3171        run_schema_test_with_error(
3172            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3173            Arc::new(Schema::new(vec![
3174                Field::new("int64", ArrowDataType::Int64, false),
3175                Field::new("int32", ArrowDataType::Int32, false),
3176            ])),
3177            "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3178        );
3179    }
3180
3181    #[test]
3182    fn test_schema_mismatched_column_names() {
3183        run_schema_test_with_error(
3184            vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3185            Arc::new(Schema::new(vec![Field::new(
3186                "other",
3187                ArrowDataType::Int64,
3188                false,
3189            )])),
3190            "Arrow: incompatible arrow schema, expected field named int64 got other",
3191        );
3192    }
3193
3194    #[test]
3195    fn test_schema_incompatible_columns() {
3196        run_schema_test_with_error(
3197            vec![
3198                (
3199                    "col1_invalid",
3200                    Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3201                ),
3202                (
3203                    "col2_valid",
3204                    Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3205                ),
3206                (
3207                    "col3_invalid",
3208                    Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3209                ),
3210            ],
3211            Arc::new(Schema::new(vec![
3212                Field::new("col1_invalid", ArrowDataType::Int32, false),
3213                Field::new("col2_valid", ArrowDataType::Int32, false),
3214                Field::new("col3_invalid", ArrowDataType::Int32, false),
3215            ])),
3216            "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3217        );
3218    }
3219
3220    #[test]
3221    fn test_one_incompatible_nested_column() {
3222        let nested_fields = Fields::from(vec![
3223            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3224            Field::new("nested1_invalid", ArrowDataType::Int64, false),
3225        ]);
3226        let nested = StructArray::try_new(
3227            nested_fields,
3228            vec![
3229                Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3230                Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3231            ],
3232            None,
3233        )
3234        .expect("struct array");
3235        let supplied_nested_fields = Fields::from(vec![
3236            Field::new("nested1_valid", ArrowDataType::Utf8, false),
3237            Field::new("nested1_invalid", ArrowDataType::Int32, false),
3238        ]);
3239        run_schema_test_with_error(
3240            vec![
3241                ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3242                ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3243                ("nested", Arc::new(nested) as ArrayRef),
3244            ],
3245            Arc::new(Schema::new(vec![
3246                Field::new("col1", ArrowDataType::Int64, false),
3247                Field::new("col2", ArrowDataType::Int32, false),
3248                Field::new(
3249                    "nested",
3250                    ArrowDataType::Struct(supplied_nested_fields),
3251                    false,
3252                ),
3253            ])),
3254            "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3255        );
3256    }
3257
3258    #[test]
3259    fn test_read_binary_as_utf8() {
3260        let file = write_parquet_from_iter(vec![
3261            (
3262                "binary_to_utf8",
3263                Arc::new(BinaryArray::from(vec![
3264                    b"one".as_ref(),
3265                    b"two".as_ref(),
3266                    b"three".as_ref(),
3267                ])) as ArrayRef,
3268            ),
3269            (
3270                "large_binary_to_large_utf8",
3271                Arc::new(LargeBinaryArray::from(vec![
3272                    b"one".as_ref(),
3273                    b"two".as_ref(),
3274                    b"three".as_ref(),
3275                ])) as ArrayRef,
3276            ),
3277            (
3278                "binary_view_to_utf8_view",
3279                Arc::new(BinaryViewArray::from(vec![
3280                    b"one".as_ref(),
3281                    b"two".as_ref(),
3282                    b"three".as_ref(),
3283                ])) as ArrayRef,
3284            ),
3285        ]);
3286        let supplied_fields = Fields::from(vec![
3287            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3288            Field::new(
3289                "large_binary_to_large_utf8",
3290                ArrowDataType::LargeUtf8,
3291                false,
3292            ),
3293            Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3294        ]);
3295
3296        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3297        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3298            file.try_clone().unwrap(),
3299            options,
3300        )
3301        .expect("reader builder with schema")
3302        .build()
3303        .expect("reader with schema");
3304
3305        let batch = arrow_reader.next().unwrap().unwrap();
3306        assert_eq!(batch.num_columns(), 3);
3307        assert_eq!(batch.num_rows(), 3);
3308        assert_eq!(
3309            batch
3310                .column(0)
3311                .as_string::<i32>()
3312                .iter()
3313                .collect::<Vec<_>>(),
3314            vec![Some("one"), Some("two"), Some("three")]
3315        );
3316
3317        assert_eq!(
3318            batch
3319                .column(1)
3320                .as_string::<i64>()
3321                .iter()
3322                .collect::<Vec<_>>(),
3323            vec![Some("one"), Some("two"), Some("three")]
3324        );
3325
3326        assert_eq!(
3327            batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3328            vec![Some("one"), Some("two"), Some("three")]
3329        );
3330    }
3331
3332    #[test]
3333    #[should_panic(expected = "Invalid UTF8 sequence at")]
3334    fn test_read_non_utf8_binary_as_utf8() {
3335        let file = write_parquet_from_iter(vec![(
3336            "non_utf8_binary",
3337            Arc::new(BinaryArray::from(vec![
3338                b"\xDE\x00\xFF".as_ref(),
3339                b"\xDE\x01\xAA".as_ref(),
3340                b"\xDE\x02\xFF".as_ref(),
3341            ])) as ArrayRef,
3342        )]);
3343        let supplied_fields = Fields::from(vec![Field::new(
3344            "non_utf8_binary",
3345            ArrowDataType::Utf8,
3346            false,
3347        )]);
3348
3349        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3350        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3351            file.try_clone().unwrap(),
3352            options,
3353        )
3354        .expect("reader builder with schema")
3355        .build()
3356        .expect("reader with schema");
3357        arrow_reader.next().unwrap().unwrap_err();
3358    }
3359
3360    #[test]
3361    fn test_with_schema() {
3362        let nested_fields = Fields::from(vec![
3363            Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3364            Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3365        ]);
3366
3367        let nested_arrays: Vec<ArrayRef> = vec![
3368            Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3369            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3370        ];
3371
3372        let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3373
3374        let file = write_parquet_from_iter(vec![
3375            (
3376                "int32_to_ts_second",
3377                Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3378            ),
3379            (
3380                "date32_to_date64",
3381                Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3382            ),
3383            ("nested", Arc::new(nested) as ArrayRef),
3384        ]);
3385
3386        let supplied_nested_fields = Fields::from(vec![
3387            Field::new(
3388                "utf8_to_dict",
3389                ArrowDataType::Dictionary(
3390                    Box::new(ArrowDataType::Int32),
3391                    Box::new(ArrowDataType::Utf8),
3392                ),
3393                false,
3394            ),
3395            Field::new(
3396                "int64_to_ts_nano",
3397                ArrowDataType::Timestamp(
3398                    arrow::datatypes::TimeUnit::Nanosecond,
3399                    Some("+10:00".into()),
3400                ),
3401                false,
3402            ),
3403        ]);
3404
3405        let supplied_schema = Arc::new(Schema::new(vec![
3406            Field::new(
3407                "int32_to_ts_second",
3408                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3409                false,
3410            ),
3411            Field::new("date32_to_date64", ArrowDataType::Date64, false),
3412            Field::new(
3413                "nested",
3414                ArrowDataType::Struct(supplied_nested_fields),
3415                false,
3416            ),
3417        ]));
3418
3419        let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3420        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3421            file.try_clone().unwrap(),
3422            options,
3423        )
3424        .expect("reader builder with schema")
3425        .build()
3426        .expect("reader with schema");
3427
3428        assert_eq!(arrow_reader.schema(), supplied_schema);
3429        let batch = arrow_reader.next().unwrap().unwrap();
3430        assert_eq!(batch.num_columns(), 3);
3431        assert_eq!(batch.num_rows(), 4);
3432        assert_eq!(
3433            batch
3434                .column(0)
3435                .as_any()
3436                .downcast_ref::<TimestampSecondArray>()
3437                .expect("downcast to timestamp second")
3438                .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3439                .map(|v| v.to_string())
3440                .expect("value as datetime"),
3441            "1970-01-01 01:00:00 +01:00"
3442        );
3443        assert_eq!(
3444            batch
3445                .column(1)
3446                .as_any()
3447                .downcast_ref::<Date64Array>()
3448                .expect("downcast to date64")
3449                .value_as_date(0)
3450                .map(|v| v.to_string())
3451                .expect("value as date"),
3452            "1970-01-01"
3453        );
3454
3455        let nested = batch
3456            .column(2)
3457            .as_any()
3458            .downcast_ref::<StructArray>()
3459            .expect("downcast to struct");
3460
3461        let nested_dict = nested
3462            .column(0)
3463            .as_any()
3464            .downcast_ref::<Int32DictionaryArray>()
3465            .expect("downcast to dictionary");
3466
3467        assert_eq!(
3468            nested_dict
3469                .values()
3470                .as_any()
3471                .downcast_ref::<StringArray>()
3472                .expect("downcast to string")
3473                .iter()
3474                .collect::<Vec<_>>(),
3475            vec![Some("a"), Some("b")]
3476        );
3477
3478        assert_eq!(
3479            nested_dict.keys().iter().collect::<Vec<_>>(),
3480            vec![Some(0), Some(0), Some(0), Some(1)]
3481        );
3482
3483        assert_eq!(
3484            nested
3485                .column(1)
3486                .as_any()
3487                .downcast_ref::<TimestampNanosecondArray>()
3488                .expect("downcast to timestamp nanosecond")
3489                .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3490                .map(|v| v.to_string())
3491                .expect("value as datetime"),
3492            "1970-01-01 10:00:00.000000001 +10:00"
3493        );
3494    }
3495
3496    #[test]
3497    fn test_empty_projection() {
3498        let testdata = arrow::util::test_util::parquet_test_data();
3499        let path = format!("{testdata}/alltypes_plain.parquet");
3500        let file = File::open(path).unwrap();
3501
3502        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3503        let file_metadata = builder.metadata().file_metadata();
3504        let expected_rows = file_metadata.num_rows() as usize;
3505
3506        let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3507        let batch_reader = builder
3508            .with_projection(mask)
3509            .with_batch_size(2)
3510            .build()
3511            .unwrap();
3512
3513        let mut total_rows = 0;
3514        for maybe_batch in batch_reader {
3515            let batch = maybe_batch.unwrap();
3516            total_rows += batch.num_rows();
3517            assert_eq!(batch.num_columns(), 0);
3518            assert!(batch.num_rows() <= 2);
3519        }
3520
3521        assert_eq!(total_rows, expected_rows);
3522    }
3523
3524    fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3525        let schema = Arc::new(Schema::new(vec![Field::new(
3526            "list",
3527            ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3528            true,
3529        )]));
3530
3531        let mut buf = Vec::with_capacity(1024);
3532
3533        let mut writer = ArrowWriter::try_new(
3534            &mut buf,
3535            schema.clone(),
3536            Some(
3537                WriterProperties::builder()
3538                    .set_max_row_group_size(row_group_size)
3539                    .build(),
3540            ),
3541        )
3542        .unwrap();
3543        for _ in 0..2 {
3544            let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3545            for _ in 0..(batch_size) {
3546                list_builder.append(true);
3547            }
3548            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3549                .unwrap();
3550            writer.write(&batch).unwrap();
3551        }
3552        writer.close().unwrap();
3553
3554        let mut record_reader =
3555            ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3556        assert_eq!(
3557            batch_size,
3558            record_reader.next().unwrap().unwrap().num_rows()
3559        );
3560        assert_eq!(
3561            batch_size,
3562            record_reader.next().unwrap().unwrap().num_rows()
3563        );
3564    }
3565
3566    #[test]
3567    fn test_row_group_exact_multiple() {
3568        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3569        test_row_group_batch(8, 8);
3570        test_row_group_batch(10, 8);
3571        test_row_group_batch(8, 10);
3572        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3573        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3574        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3575        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3576        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3577    }
3578
3579    /// Given a RecordBatch containing all the column data, return the expected batches given
3580    /// a `batch_size` and `selection`
3581    fn get_expected_batches(
3582        column: &RecordBatch,
3583        selection: &RowSelection,
3584        batch_size: usize,
3585    ) -> Vec<RecordBatch> {
3586        let mut expected_batches = vec![];
3587
3588        let mut selection: VecDeque<_> = selection.clone().into();
3589        let mut row_offset = 0;
3590        let mut last_start = None;
3591        while row_offset < column.num_rows() && !selection.is_empty() {
3592            let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3593            while batch_remaining > 0 && !selection.is_empty() {
3594                let (to_read, skip) = match selection.front_mut() {
3595                    Some(selection) if selection.row_count > batch_remaining => {
3596                        selection.row_count -= batch_remaining;
3597                        (batch_remaining, selection.skip)
3598                    }
3599                    Some(_) => {
3600                        let select = selection.pop_front().unwrap();
3601                        (select.row_count, select.skip)
3602                    }
3603                    None => break,
3604                };
3605
3606                batch_remaining -= to_read;
3607
3608                match skip {
3609                    true => {
3610                        if let Some(last_start) = last_start.take() {
3611                            expected_batches.push(column.slice(last_start, row_offset - last_start))
3612                        }
3613                        row_offset += to_read
3614                    }
3615                    false => {
3616                        last_start.get_or_insert(row_offset);
3617                        row_offset += to_read
3618                    }
3619                }
3620            }
3621        }
3622
3623        if let Some(last_start) = last_start.take() {
3624            expected_batches.push(column.slice(last_start, row_offset - last_start))
3625        }
3626
3627        // Sanity check, all batches except the final should be the batch size
3628        for batch in &expected_batches[..expected_batches.len() - 1] {
3629            assert_eq!(batch.num_rows(), batch_size);
3630        }
3631
3632        expected_batches
3633    }
3634
3635    fn create_test_selection(
3636        step_len: usize,
3637        total_len: usize,
3638        skip_first: bool,
3639    ) -> (RowSelection, usize) {
3640        let mut remaining = total_len;
3641        let mut skip = skip_first;
3642        let mut vec = vec![];
3643        let mut selected_count = 0;
3644        while remaining != 0 {
3645            let step = if remaining > step_len {
3646                step_len
3647            } else {
3648                remaining
3649            };
3650            vec.push(RowSelector {
3651                row_count: step,
3652                skip,
3653            });
3654            remaining -= step;
3655            if !skip {
3656                selected_count += step;
3657            }
3658            skip = !skip;
3659        }
3660        (vec.into(), selected_count)
3661    }
3662
3663    #[test]
3664    fn test_scan_row_with_selection() {
3665        let testdata = arrow::util::test_util::parquet_test_data();
3666        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3667        let test_file = File::open(&path).unwrap();
3668
3669        let mut serial_reader =
3670            ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3671        let data = serial_reader.next().unwrap().unwrap();
3672
3673        let do_test = |batch_size: usize, selection_len: usize| {
3674            for skip_first in [false, true] {
3675                let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3676
3677                let expected = get_expected_batches(&data, &selections, batch_size);
3678                let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3679                assert_eq!(
3680                    skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3681                    expected,
3682                    "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3683                );
3684            }
3685        };
3686
3687        // total row count 7300
3688        // 1. test selection len more than one page row count
3689        do_test(1000, 1000);
3690
3691        // 2. test selection len less than one page row count
3692        do_test(20, 20);
3693
3694        // 3. test selection_len less than batch_size
3695        do_test(20, 5);
3696
3697        // 4. test selection_len more than batch_size
3698        // If batch_size < selection_len
3699        do_test(20, 5);
3700
3701        fn create_skip_reader(
3702            test_file: &File,
3703            batch_size: usize,
3704            selections: RowSelection,
3705        ) -> ParquetRecordBatchReader {
3706            let options = ArrowReaderOptions::new().with_page_index(true);
3707            let file = test_file.try_clone().unwrap();
3708            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3709                .unwrap()
3710                .with_batch_size(batch_size)
3711                .with_row_selection(selections)
3712                .build()
3713                .unwrap()
3714        }
3715    }
3716
3717    #[test]
3718    fn test_batch_size_overallocate() {
3719        let testdata = arrow::util::test_util::parquet_test_data();
3720        // `alltypes_plain.parquet` only have 8 rows
3721        let path = format!("{testdata}/alltypes_plain.parquet");
3722        let test_file = File::open(path).unwrap();
3723
3724        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3725        let num_rows = builder.metadata.file_metadata().num_rows();
3726        let reader = builder
3727            .with_batch_size(1024)
3728            .with_projection(ProjectionMask::all())
3729            .build()
3730            .unwrap();
3731        assert_ne!(1024, num_rows);
3732        assert_eq!(reader.batch_size, num_rows as usize);
3733    }
3734
3735    #[test]
3736    fn test_read_with_page_index_enabled() {
3737        let testdata = arrow::util::test_util::parquet_test_data();
3738
3739        {
3740            // `alltypes_tiny_pages.parquet` has page index
3741            let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3742            let test_file = File::open(path).unwrap();
3743            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3744                test_file,
3745                ArrowReaderOptions::new().with_page_index(true),
3746            )
3747            .unwrap();
3748            assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3749            let reader = builder.build().unwrap();
3750            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3751            assert_eq!(batches.len(), 8);
3752        }
3753
3754        {
3755            // `alltypes_plain.parquet` doesn't have page index
3756            let path = format!("{testdata}/alltypes_plain.parquet");
3757            let test_file = File::open(path).unwrap();
3758            let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3759                test_file,
3760                ArrowReaderOptions::new().with_page_index(true),
3761            )
3762            .unwrap();
3763            // Although `Vec<Vec<PageLoacation>>` of each row group is empty,
3764            // we should read the file successfully.
3765            assert!(builder.metadata().offset_index().is_none());
3766            let reader = builder.build().unwrap();
3767            let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3768            assert_eq!(batches.len(), 1);
3769        }
3770    }
3771
3772    #[test]
3773    fn test_raw_repetition() {
3774        const MESSAGE_TYPE: &str = "
3775            message Log {
3776              OPTIONAL INT32 eventType;
3777              REPEATED INT32 category;
3778              REPEATED group filter {
3779                OPTIONAL INT32 error;
3780              }
3781            }
3782        ";
3783        let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3784        let props = Default::default();
3785
3786        let mut buf = Vec::with_capacity(1024);
3787        let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3788        let mut row_group_writer = writer.next_row_group().unwrap();
3789
3790        // column 0
3791        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3792        col_writer
3793            .typed::<Int32Type>()
3794            .write_batch(&[1], Some(&[1]), None)
3795            .unwrap();
3796        col_writer.close().unwrap();
3797        // column 1
3798        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3799        col_writer
3800            .typed::<Int32Type>()
3801            .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3802            .unwrap();
3803        col_writer.close().unwrap();
3804        // column 2
3805        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3806        col_writer
3807            .typed::<Int32Type>()
3808            .write_batch(&[1], Some(&[1]), Some(&[0]))
3809            .unwrap();
3810        col_writer.close().unwrap();
3811
3812        let rg_md = row_group_writer.close().unwrap();
3813        assert_eq!(rg_md.num_rows(), 1);
3814        writer.close().unwrap();
3815
3816        let bytes = Bytes::from(buf);
3817
3818        let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3819        let full = no_mask.next().unwrap().unwrap();
3820
3821        assert_eq!(full.num_columns(), 3);
3822
3823        for idx in 0..3 {
3824            let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3825            let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3826            let mut reader = b.with_projection(mask).build().unwrap();
3827            let projected = reader.next().unwrap().unwrap();
3828
3829            assert_eq!(projected.num_columns(), 1);
3830            assert_eq!(full.column(idx), projected.column(0));
3831        }
3832    }
3833
3834    #[test]
3835    fn test_read_lz4_raw() {
3836        let testdata = arrow::util::test_util::parquet_test_data();
3837        let path = format!("{testdata}/lz4_raw_compressed.parquet");
3838        let file = File::open(path).unwrap();
3839
3840        let batches = ParquetRecordBatchReader::try_new(file, 1024)
3841            .unwrap()
3842            .collect::<Result<Vec<_>, _>>()
3843            .unwrap();
3844        assert_eq!(batches.len(), 1);
3845        let batch = &batches[0];
3846
3847        assert_eq!(batch.num_columns(), 3);
3848        assert_eq!(batch.num_rows(), 4);
3849
3850        // https://github.com/apache/parquet-testing/pull/18
3851        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3852        assert_eq!(
3853            a.values(),
3854            &[1593604800, 1593604800, 1593604801, 1593604801]
3855        );
3856
3857        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3858        let a: Vec<_> = a.iter().flatten().collect();
3859        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3860
3861        let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3862        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3863    }
3864
3865    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
3866    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
3867    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3868    //    LZ4_HADOOP algorithm for compression.
3869    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
3870    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
3871    //    older parquet-cpp versions.
3872    //
3873    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
3874    #[test]
3875    fn test_read_lz4_hadoop_fallback() {
3876        for file in [
3877            "hadoop_lz4_compressed.parquet",
3878            "non_hadoop_lz4_compressed.parquet",
3879        ] {
3880            let testdata = arrow::util::test_util::parquet_test_data();
3881            let path = format!("{testdata}/{file}");
3882            let file = File::open(path).unwrap();
3883            let expected_rows = 4;
3884
3885            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3886                .unwrap()
3887                .collect::<Result<Vec<_>, _>>()
3888                .unwrap();
3889            assert_eq!(batches.len(), 1);
3890            let batch = &batches[0];
3891
3892            assert_eq!(batch.num_columns(), 3);
3893            assert_eq!(batch.num_rows(), expected_rows);
3894
3895            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3896            assert_eq!(
3897                a.values(),
3898                &[1593604800, 1593604800, 1593604801, 1593604801]
3899            );
3900
3901            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3902            let b: Vec<_> = b.iter().flatten().collect();
3903            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
3904
3905            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3906            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
3907        }
3908    }
3909
3910    #[test]
3911    fn test_read_lz4_hadoop_large() {
3912        let testdata = arrow::util::test_util::parquet_test_data();
3913        let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
3914        let file = File::open(path).unwrap();
3915        let expected_rows = 10000;
3916
3917        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3918            .unwrap()
3919            .collect::<Result<Vec<_>, _>>()
3920            .unwrap();
3921        assert_eq!(batches.len(), 1);
3922        let batch = &batches[0];
3923
3924        assert_eq!(batch.num_columns(), 1);
3925        assert_eq!(batch.num_rows(), expected_rows);
3926
3927        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
3928        let a: Vec<_> = a.iter().flatten().collect();
3929        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
3930        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
3931        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
3932        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
3933    }
3934
3935    #[test]
3936    #[cfg(feature = "snap")]
3937    fn test_read_nested_lists() {
3938        let testdata = arrow::util::test_util::parquet_test_data();
3939        let path = format!("{testdata}/nested_lists.snappy.parquet");
3940        let file = File::open(path).unwrap();
3941
3942        let f = file.try_clone().unwrap();
3943        let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
3944        let expected = reader.next().unwrap().unwrap();
3945        assert_eq!(expected.num_rows(), 3);
3946
3947        let selection = RowSelection::from(vec![
3948            RowSelector::skip(1),
3949            RowSelector::select(1),
3950            RowSelector::skip(1),
3951        ]);
3952        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
3953            .unwrap()
3954            .with_row_selection(selection)
3955            .build()
3956            .unwrap();
3957
3958        let actual = reader.next().unwrap().unwrap();
3959        assert_eq!(actual.num_rows(), 1);
3960        assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
3961    }
3962
3963    #[test]
3964    fn test_arbitrary_decimal() {
3965        let values = [1, 2, 3, 4, 5, 6, 7, 8];
3966        let decimals_19_0 = Decimal128Array::from_iter_values(values)
3967            .with_precision_and_scale(19, 0)
3968            .unwrap();
3969        let decimals_12_0 = Decimal128Array::from_iter_values(values)
3970            .with_precision_and_scale(12, 0)
3971            .unwrap();
3972        let decimals_17_10 = Decimal128Array::from_iter_values(values)
3973            .with_precision_and_scale(17, 10)
3974            .unwrap();
3975
3976        let written = RecordBatch::try_from_iter([
3977            ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
3978            ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
3979            ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
3980        ])
3981        .unwrap();
3982
3983        let mut buffer = Vec::with_capacity(1024);
3984        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
3985        writer.write(&written).unwrap();
3986        writer.close().unwrap();
3987
3988        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
3989            .unwrap()
3990            .collect::<Result<Vec<_>, _>>()
3991            .unwrap();
3992
3993        assert_eq!(&written.slice(0, 8), &read[0]);
3994    }
3995
3996    #[test]
3997    fn test_list_skip() {
3998        let mut list = ListBuilder::new(Int32Builder::new());
3999        list.append_value([Some(1), Some(2)]);
4000        list.append_value([Some(3)]);
4001        list.append_value([Some(4)]);
4002        let list = list.finish();
4003        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4004
4005        // First page contains 2 values but only 1 row
4006        let props = WriterProperties::builder()
4007            .set_data_page_row_count_limit(1)
4008            .set_write_batch_size(2)
4009            .build();
4010
4011        let mut buffer = Vec::with_capacity(1024);
4012        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4013        writer.write(&batch).unwrap();
4014        writer.close().unwrap();
4015
4016        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4017        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4018            .unwrap()
4019            .with_row_selection(selection.into())
4020            .build()
4021            .unwrap();
4022        let out = reader.next().unwrap().unwrap();
4023        assert_eq!(out.num_rows(), 1);
4024        assert_eq!(out, batch.slice(2, 1));
4025    }
4026
4027    fn test_decimal_roundtrip<T: DecimalType>() {
4028        // Precision <= 9 -> INT32
4029        // Precision <= 18 -> INT64
4030        // Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4031
4032        let d = |values: Vec<usize>, p: u8| {
4033            let iter = values.into_iter().map(T::Native::usize_as);
4034            PrimitiveArray::<T>::from_iter_values(iter)
4035                .with_precision_and_scale(p, 2)
4036                .unwrap()
4037        };
4038
4039        let d1 = d(vec![1, 2, 3, 4, 5], 9);
4040        let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4041        let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4042        let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4043
4044        let batch = RecordBatch::try_from_iter([
4045            ("d1", Arc::new(d1) as ArrayRef),
4046            ("d2", Arc::new(d2) as ArrayRef),
4047            ("d3", Arc::new(d3) as ArrayRef),
4048            ("d4", Arc::new(d4) as ArrayRef),
4049        ])
4050        .unwrap();
4051
4052        let mut buffer = Vec::with_capacity(1024);
4053        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4054        writer.write(&batch).unwrap();
4055        writer.close().unwrap();
4056
4057        let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4058        let t1 = builder.parquet_schema().columns()[0].physical_type();
4059        assert_eq!(t1, PhysicalType::INT32);
4060        let t2 = builder.parquet_schema().columns()[1].physical_type();
4061        assert_eq!(t2, PhysicalType::INT64);
4062        let t3 = builder.parquet_schema().columns()[2].physical_type();
4063        assert_eq!(t3, PhysicalType::INT64);
4064        let t4 = builder.parquet_schema().columns()[3].physical_type();
4065        assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4066
4067        let mut reader = builder.build().unwrap();
4068        assert_eq!(batch.schema(), reader.schema());
4069
4070        let out = reader.next().unwrap().unwrap();
4071        assert_eq!(batch, out);
4072    }
4073
4074    #[test]
4075    fn test_decimal() {
4076        test_decimal_roundtrip::<Decimal128Type>();
4077        test_decimal_roundtrip::<Decimal256Type>();
4078    }
4079
4080    #[test]
4081    fn test_list_selection() {
4082        let schema = Arc::new(Schema::new(vec![Field::new_list(
4083            "list",
4084            Field::new_list_field(ArrowDataType::Utf8, true),
4085            false,
4086        )]));
4087        let mut buf = Vec::with_capacity(1024);
4088
4089        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4090
4091        for i in 0..2 {
4092            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4093            for j in 0..1024 {
4094                list_a_builder.values().append_value(format!("{i} {j}"));
4095                list_a_builder.append(true);
4096            }
4097            let batch =
4098                RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4099                    .unwrap();
4100            writer.write(&batch).unwrap();
4101        }
4102        let _metadata = writer.close().unwrap();
4103
4104        let buf = Bytes::from(buf);
4105        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4106            .unwrap()
4107            .with_row_selection(RowSelection::from(vec![
4108                RowSelector::skip(100),
4109                RowSelector::select(924),
4110                RowSelector::skip(100),
4111                RowSelector::select(924),
4112            ]))
4113            .build()
4114            .unwrap();
4115
4116        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4117        let batch = concat_batches(&schema, &batches).unwrap();
4118
4119        assert_eq!(batch.num_rows(), 924 * 2);
4120        let list = batch.column(0).as_list::<i32>();
4121
4122        for w in list.value_offsets().windows(2) {
4123            assert_eq!(w[0] + 1, w[1])
4124        }
4125        let mut values = list.values().as_string::<i32>().iter();
4126
4127        for i in 0..2 {
4128            for j in 100..1024 {
4129                let expected = format!("{i} {j}");
4130                assert_eq!(values.next().unwrap().unwrap(), &expected);
4131            }
4132        }
4133    }
4134
4135    #[test]
4136    fn test_list_selection_fuzz() {
4137        let mut rng = thread_rng();
4138        let schema = Arc::new(Schema::new(vec![Field::new_list(
4139            "list",
4140            Field::new_list(
4141                Field::LIST_FIELD_DEFAULT_NAME,
4142                Field::new_list_field(ArrowDataType::Int32, true),
4143                true,
4144            ),
4145            true,
4146        )]));
4147        let mut buf = Vec::with_capacity(1024);
4148        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4149
4150        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4151
4152        for _ in 0..2048 {
4153            if rng.gen_bool(0.2) {
4154                list_a_builder.append(false);
4155                continue;
4156            }
4157
4158            let list_a_len = rng.gen_range(0..10);
4159            let list_b_builder = list_a_builder.values();
4160
4161            for _ in 0..list_a_len {
4162                if rng.gen_bool(0.2) {
4163                    list_b_builder.append(false);
4164                    continue;
4165                }
4166
4167                let list_b_len = rng.gen_range(0..10);
4168                let int_builder = list_b_builder.values();
4169                for _ in 0..list_b_len {
4170                    match rng.gen_bool(0.2) {
4171                        true => int_builder.append_null(),
4172                        false => int_builder.append_value(rng.gen()),
4173                    }
4174                }
4175                list_b_builder.append(true)
4176            }
4177            list_a_builder.append(true);
4178        }
4179
4180        let array = Arc::new(list_a_builder.finish());
4181        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4182
4183        writer.write(&batch).unwrap();
4184        let _metadata = writer.close().unwrap();
4185
4186        let buf = Bytes::from(buf);
4187
4188        let cases = [
4189            vec![
4190                RowSelector::skip(100),
4191                RowSelector::select(924),
4192                RowSelector::skip(100),
4193                RowSelector::select(924),
4194            ],
4195            vec![
4196                RowSelector::select(924),
4197                RowSelector::skip(100),
4198                RowSelector::select(924),
4199                RowSelector::skip(100),
4200            ],
4201            vec![
4202                RowSelector::skip(1023),
4203                RowSelector::select(1),
4204                RowSelector::skip(1023),
4205                RowSelector::select(1),
4206            ],
4207            vec![
4208                RowSelector::select(1),
4209                RowSelector::skip(1023),
4210                RowSelector::select(1),
4211                RowSelector::skip(1023),
4212            ],
4213        ];
4214
4215        for batch_size in [100, 1024, 2048] {
4216            for selection in &cases {
4217                let selection = RowSelection::from(selection.clone());
4218                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4219                    .unwrap()
4220                    .with_row_selection(selection.clone())
4221                    .with_batch_size(batch_size)
4222                    .build()
4223                    .unwrap();
4224
4225                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4226                let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4227                assert_eq!(actual.num_rows(), selection.row_count());
4228
4229                let mut batch_offset = 0;
4230                let mut actual_offset = 0;
4231                for selector in selection.iter() {
4232                    if selector.skip {
4233                        batch_offset += selector.row_count;
4234                        continue;
4235                    }
4236
4237                    assert_eq!(
4238                        batch.slice(batch_offset, selector.row_count),
4239                        actual.slice(actual_offset, selector.row_count)
4240                    );
4241
4242                    batch_offset += selector.row_count;
4243                    actual_offset += selector.row_count;
4244                }
4245            }
4246        }
4247    }
4248
4249    #[test]
4250    fn test_read_old_nested_list() {
4251        use arrow::datatypes::DataType;
4252        use arrow::datatypes::ToByteSlice;
4253
4254        let testdata = arrow::util::test_util::parquet_test_data();
4255        // message my_record {
4256        //     REQUIRED group a (LIST) {
4257        //         REPEATED group array (LIST) {
4258        //             REPEATED INT32 array;
4259        //         }
4260        //     }
4261        // }
4262        // should be read as list<list<int32>>
4263        let path = format!("{testdata}/old_list_structure.parquet");
4264        let test_file = File::open(path).unwrap();
4265
4266        // create expected ListArray
4267        let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4268
4269        // Construct a buffer for value offsets, for the nested array: [[1, 2], [3, 4]]
4270        let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4271
4272        // Construct a list array from the above two
4273        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4274            "array",
4275            DataType::Int32,
4276            false,
4277        ))))
4278        .len(2)
4279        .add_buffer(a_value_offsets)
4280        .add_child_data(a_values.into_data())
4281        .build()
4282        .unwrap();
4283        let a = ListArray::from(a_list_data);
4284
4285        let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4286        let mut reader = builder.build().unwrap();
4287        let out = reader.next().unwrap().unwrap();
4288        assert_eq!(out.num_rows(), 1);
4289        assert_eq!(out.num_columns(), 1);
4290        // grab first column
4291        let c0 = out.column(0);
4292        let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4293        // get first row: [[1, 2], [3, 4]]
4294        let r0 = c0arr.value(0);
4295        let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4296        assert_eq!(r0arr, &a);
4297    }
4298
4299    #[test]
4300    fn test_map_no_value() {
4301        // File schema:
4302        // message schema {
4303        //   required group my_map (MAP) {
4304        //     repeated group key_value {
4305        //       required int32 key;
4306        //       optional int32 value;
4307        //     }
4308        //   }
4309        //   required group my_map_no_v (MAP) {
4310        //     repeated group key_value {
4311        //       required int32 key;
4312        //     }
4313        //   }
4314        //   required group my_list (LIST) {
4315        //     repeated group list {
4316        //       required int32 element;
4317        //     }
4318        //   }
4319        // }
4320        let testdata = arrow::util::test_util::parquet_test_data();
4321        let path = format!("{testdata}/map_no_value.parquet");
4322        let file = File::open(path).unwrap();
4323
4324        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4325            .unwrap()
4326            .build()
4327            .unwrap();
4328        let out = reader.next().unwrap().unwrap();
4329        assert_eq!(out.num_rows(), 3);
4330        assert_eq!(out.num_columns(), 3);
4331        // my_map_no_v and my_list columns should now be equivalent
4332        let c0 = out.column(1).as_list::<i32>();
4333        let c1 = out.column(2).as_list::<i32>();
4334        assert_eq!(c0.len(), c1.len());
4335        c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4336    }
4337}