Skip to main content

parquet/arrow/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! API for reading/writing Arrow [`RecordBatch`]es and [`Array`]s to/from
19//! Parquet Files.
20//!
21//! See the [crate-level documentation](crate) for more details on other APIs
22//!
23//! # Schema Conversion
24//!
25//! These APIs ensure that data in Arrow [`RecordBatch`]es written to Parquet are
26//! read back as [`RecordBatch`]es with the exact same types and values.
27//!
28//! Parquet and Arrow have different type systems, and there is not
29//! always a one to one mapping between the systems. For example, data
30//! stored as a Parquet [`BYTE_ARRAY`] can be read as either an Arrow
31//! [`BinaryViewArray`] or [`BinaryArray`].
32//!
33//! To recover the original Arrow types, the writers in this module add a "hint" to
34//! the metadata in the [`ARROW_SCHEMA_META_KEY`] key which records the original Arrow
35//! schema. The metadata hint follows the same convention as arrow-cpp based
36//! implementations such as `pyarrow`. The reader looks for the schema hint in the
37//! metadata to determine Arrow types, and if it is not present, infers the Arrow schema
38//! from the Parquet schema.
39//!
40//! In situations where the embedded Arrow schema is not compatible with the Parquet
41//! schema, the Parquet schema takes precedence and no error is raised.
42//! See [#1663](https://github.com/apache/arrow-rs/issues/1663)
43//!
44//! You can also control the type conversion process in more detail using:
45//!
46//! * [`ArrowSchemaConverter`] control the conversion of Arrow types to Parquet
47//!   types.
48//!
49//! * [`ArrowReaderOptions::with_schema`] to explicitly specify your own Arrow schema hint
50//!   to use when reading Parquet, overriding any metadata that may be present.
51//!
52//! [`RecordBatch`]: arrow_array::RecordBatch
53//! [`Array`]: arrow_array::Array
54//! [`BYTE_ARRAY`]: crate::basic::Type::BYTE_ARRAY
55//! [`BinaryViewArray`]: arrow_array::BinaryViewArray
56//! [`BinaryArray`]: arrow_array::BinaryArray
57//! [`ArrowReaderOptions::with_schema`]: arrow_reader::ArrowReaderOptions::with_schema
58//!
59//! # Example: Writing Arrow `RecordBatch` to Parquet file
60//!
61//!```rust
62//! # use arrow_array::{Int32Array, ArrayRef};
63//! # use arrow_array::RecordBatch;
64//! # use parquet::arrow::arrow_writer::ArrowWriter;
65//! # use parquet::file::properties::WriterProperties;
66//! # use tempfile::tempfile;
67//! # use std::sync::Arc;
68//! # use parquet::basic::Compression;
69//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
70//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
71//! let batch = RecordBatch::try_from_iter(vec![
72//!   ("id", Arc::new(ids) as ArrayRef),
73//!   ("val", Arc::new(vals) as ArrayRef),
74//! ]).unwrap();
75//!
76//! let file = tempfile().unwrap();
77//!
78//! // WriterProperties can be used to set Parquet file options
79//! let props = WriterProperties::builder()
80//!     .set_compression(Compression::SNAPPY)
81//!     .build();
82//!
83//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
84//!
85//! writer.write(&batch).expect("Writing batch");
86//!
87//! // writer must be closed to write footer
88//! writer.close().unwrap();
89//! ```
90//!
91//! # Example: Reading Parquet file into Arrow `RecordBatch`
92//!
93//! ```rust
94//! # use std::fs::File;
95//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
96//! # use std::sync::Arc;
97//! # use arrow_array::Int32Array;
98//! # use arrow::datatypes::{DataType, Field, Schema};
99//! # use arrow_array::RecordBatch;
100//! # use parquet::arrow::arrow_writer::ArrowWriter;
101//! #
102//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
103//! # let schema = Arc::new(Schema::new(vec![
104//! #     Field::new("id", DataType::Int32, false),
105//! # ]));
106//! #
107//! # let file = File::create("data.parquet").unwrap();
108//! #
109//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
110//! # let batches = vec![batch];
111//! #
112//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
113//! #
114//! # for batch in batches {
115//! #     writer.write(&batch).expect("Writing batch");
116//! # }
117//! # writer.close().unwrap();
118//! #
119//! let file = File::open("data.parquet").unwrap();
120//!
121//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
122//! println!("Converted arrow schema is: {}", builder.schema());
123//!
124//! let mut reader = builder.build().unwrap();
125//!
126//! let record_batch = reader.next().unwrap().unwrap();
127//!
128//! println!("Read {} records.", record_batch.num_rows());
129//! ```
130//!
131//! # Example: Reading non-uniformly encrypted parquet file into arrow record batch
132//!
133//! Note: This requires the experimental `encryption` feature to be enabled at compile time.
134//!
135#![cfg_attr(feature = "encryption", doc = "```rust")]
136#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
137//! # use arrow_array::{Int32Array, ArrayRef};
138//! # use arrow_array::{types, RecordBatch};
139//! # use parquet::arrow::arrow_reader::{
140//! #     ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
141//! # };
142//! # use arrow_array::cast::AsArray;
143//! # use parquet::file::metadata::ParquetMetaData;
144//! # use tempfile::tempfile;
145//! # use std::fs::File;
146//! # use parquet::encryption::decrypt::FileDecryptionProperties;
147//! # let test_data = arrow::util::test_util::parquet_test_data();
148//! # let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
149//! #
150//! let file = File::open(path).unwrap();
151//!
152//! // Define the AES encryption keys required required for decrypting the footer metadata
153//! // and column-specific data. If only a footer key is used then it is assumed that the
154//! // file uses uniform encryption and all columns are encrypted with the footer key.
155//! // If any column keys are specified, other columns without a key provided are assumed
156//! // to be unencrypted
157//! let footer_key = "0123456789012345".as_bytes(); // Keys are 128 bits (16 bytes)
158//! let column_1_key = "1234567890123450".as_bytes();
159//! let column_2_key = "1234567890123451".as_bytes();
160//!
161//! let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
162//!     .with_column_key("double_field", column_1_key.to_vec())
163//!     .with_column_key("float_field", column_2_key.to_vec())
164//!     .build()
165//!     .unwrap();
166//!
167//! let options = ArrowReaderOptions::default()
168//!  .with_file_decryption_properties(decryption_properties);
169//! let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
170//! let file_metadata = reader_metadata.metadata().file_metadata();
171//! assert_eq!(50, file_metadata.num_rows());
172//!
173//! let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
174//!   .unwrap()
175//!   .build()
176//!   .unwrap();
177//!
178//! let record_batch = reader.next().unwrap().unwrap();
179//! assert_eq!(50, record_batch.num_rows());
180//! ```
181
182experimental!(mod array_reader);
183pub mod arrow_reader;
184pub mod arrow_writer;
185mod buffer;
186mod decoder;
187
188#[cfg(feature = "async")]
189pub mod async_reader;
190#[cfg(feature = "async")]
191pub mod async_writer;
192
193pub mod push_decoder;
194
195mod in_memory_row_group;
196mod record_reader;
197
198experimental!(mod schema);
199
200use std::fmt::Debug;
201
202pub use self::arrow_writer::ArrowWriter;
203#[cfg(feature = "async")]
204pub use self::async_reader::ParquetRecordBatchStreamBuilder;
205#[cfg(feature = "async")]
206pub use self::async_writer::AsyncArrowWriter;
207use crate::schema::types::SchemaDescriptor;
208use arrow_schema::{FieldRef, Schema};
209
210pub use self::schema::{
211    ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema,
212    parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual,
213    parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*,
214};
215
216/// Schema metadata key used to store serialized Arrow schema
217///
218/// The Arrow schema is encoded using the Arrow IPC format, and then base64
219/// encoded. This is the same format used by arrow-cpp systems, such as pyarrow.
220pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
221
222/// The value of this metadata key, if present on [`Field::metadata`], will be used
223/// to populate [`BasicTypeInfo::id`]
224///
225/// [`Field::metadata`]: arrow_schema::Field::metadata
226/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
227pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
228
229/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
230///
231/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
232/// or root column indices where:
233///
234/// * Root columns are the direct children of the root schema, enumerated in order
235/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
236///
237/// For example, the schema
238///
239/// ```ignore
240/// message schema {
241///   REQUIRED boolean         leaf_1;
242///   REQUIRED GROUP group {
243///     OPTIONAL int32 leaf_2;
244///     OPTIONAL int64 leaf_3;
245///   }
246/// }
247/// ```
248///
249/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
250///
251/// For non-nested schemas, i.e. those containing only primitive columns, the root
252/// and leaves are the same
253///
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct ProjectionMask {
256    /// If `Some`, a leaf column should be included if the value at
257    /// the corresponding index is true
258    ///
259    /// If `None`, all columns should be included
260    ///
261    /// # Examples
262    ///
263    /// Given the original parquet schema with leaf columns is `[a, b, c, d]`
264    ///
265    /// A mask of `[true, false, true, false]` will result in a schema 2
266    /// elements long:
267    /// * `fields[0]`: `a`
268    /// * `fields[1]`: `c`
269    ///
270    /// A mask of `None` will result in a schema 4 elements long:
271    /// * `fields[0]`: `a`
272    /// * `fields[1]`: `b`
273    /// * `fields[2]`: `c`
274    /// * `fields[3]`: `d`
275    mask: Option<Vec<bool>>,
276}
277
278impl ProjectionMask {
279    /// Create a [`ProjectionMask`] which selects all columns
280    pub fn all() -> Self {
281        Self { mask: None }
282    }
283
284    /// Create a [`ProjectionMask`] which selects no columns
285    pub fn none(len: usize) -> Self {
286        Self {
287            mask: Some(vec![false; len]),
288        }
289    }
290
291    /// Create a [`ProjectionMask`] which selects only the specified leaf columns
292    ///
293    /// Note: repeated or out of order indices will not impact the final mask
294    ///
295    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
296    pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
297        let mut mask = vec![false; schema.num_columns()];
298        for leaf_idx in indices {
299            mask[leaf_idx] = true;
300        }
301        Self { mask: Some(mask) }
302    }
303
304    /// Create a [`ProjectionMask`] which selects only the specified root columns
305    ///
306    /// Note: repeated or out of order indices will not impact the final mask
307    ///
308    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
309    pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
310        let num_root_columns = schema.root_schema().get_fields().len();
311        let mut root_mask = vec![false; num_root_columns];
312        for root_idx in indices {
313            root_mask[root_idx] = true;
314        }
315
316        let mask = (0..schema.num_columns())
317            .map(|leaf_idx| {
318                let root_idx = schema.get_column_root_idx(leaf_idx);
319                root_mask[root_idx]
320            })
321            .collect();
322
323        Self { mask: Some(mask) }
324    }
325
326    /// Create a [`ProjectionMask`] which selects only the named columns
327    ///
328    /// All leaf columns that fall below a given name will be selected. For example, given
329    /// the schema
330    /// ```ignore
331    /// message schema {
332    ///   OPTIONAL group a (MAP) {
333    ///     REPEATED group key_value {
334    ///       REQUIRED BYTE_ARRAY key (UTF8);  // leaf index 0
335    ///       OPTIONAL group value (MAP) {
336    ///         REPEATED group key_value {
337    ///           REQUIRED INT32 key;          // leaf index 1
338    ///           REQUIRED BOOLEAN value;      // leaf index 2
339    ///         }
340    ///       }
341    ///     }
342    ///   }
343    ///   REQUIRED INT32 b;                    // leaf index 3
344    ///   REQUIRED DOUBLE c;                   // leaf index 4
345    /// }
346    /// ```
347    /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return
348    /// columns 0, 1, and 2.
349    ///
350    /// Note: repeated or out of order indices will not impact the final mask.
351    ///
352    /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`.
353    ///
354    /// Also, this will not produce the desired results if a column contains a '.' in its name.
355    /// Use [`Self::leaves`] or [`Self::roots`] in that case.
356    pub fn columns<'a>(
357        schema: &SchemaDescriptor,
358        names: impl IntoIterator<Item = &'a str>,
359    ) -> Self {
360        let mut mask = vec![false; schema.num_columns()];
361        for name in names {
362            let name_path: Vec<&str> = name.split('.').collect();
363            for (idx, col) in schema.columns().iter().enumerate() {
364                let path = col.path().parts();
365                // searching for "a.b.c" cannot match "a.b"
366                if name_path.len() > path.len() {
367                    continue;
368                }
369                // now path >= name_path, so check that each element in name_path matches
370                if name_path.iter().zip(path.iter()).all(|(a, b)| a == b) {
371                    mask[idx] = true;
372                }
373            }
374        }
375
376        Self { mask: Some(mask) }
377    }
378
379    /// Returns true if the leaf column `leaf_idx` is included by the mask
380    pub fn leaf_included(&self, leaf_idx: usize) -> bool {
381        self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
382    }
383
384    /// Union two projection masks
385    ///
386    /// Example:
387    /// ```text
388    /// mask1 = [true, false, true]
389    /// mask2 = [false, true, true]
390    /// union(mask1, mask2) = [true, true, true]
391    /// ```
392    pub fn union(&mut self, other: &Self) {
393        match (self.mask.as_ref(), other.mask.as_ref()) {
394            (None, _) | (_, None) => self.mask = None,
395            (Some(a), Some(b)) => {
396                debug_assert_eq!(a.len(), b.len());
397                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
398                self.mask = Some(mask);
399            }
400        }
401    }
402
403    /// Intersect two projection masks
404    ///
405    /// Example:
406    /// ```text
407    /// mask1 = [true, false, true]
408    /// mask2 = [false, true, true]
409    /// intersect(mask1, mask2) = [false, false, true]
410    /// ```
411    pub fn intersect(&mut self, other: &Self) {
412        match (self.mask.as_ref(), other.mask.as_ref()) {
413            (None, _) => self.mask = other.mask.clone(),
414            (_, None) => {}
415            (Some(a), Some(b)) => {
416                debug_assert_eq!(a.len(), b.len());
417                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
418                self.mask = Some(mask);
419            }
420        }
421    }
422
423    /// Return a new [`ProjectionMask`] that excludes any leaf columns that are
424    /// part of a nested type, such as struct, list, or map
425    ///
426    /// If there are no non-nested columns in the mask, returns `None`
427    pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option<Self> {
428        let num_leaves = schema.num_columns();
429
430        // Count how many leaves each root column has
431        let num_roots = schema.root_schema().get_fields().len();
432        let mut root_leaf_counts = vec![0usize; num_roots];
433        for leaf_idx in 0..num_leaves {
434            let root_idx = schema.get_column_root_idx(leaf_idx);
435            root_leaf_counts[root_idx] += 1;
436        }
437
438        // Cache only top-level primitive columns.
439        // Even a one-leaf group is nested; caching it drops parent def levels.
440        let mut included_leaves = Vec::new();
441        for leaf_idx in 0..num_leaves {
442            if self.leaf_included(leaf_idx) {
443                let root = schema.get_column_root(leaf_idx);
444                let root_idx = schema.get_column_root_idx(leaf_idx);
445                if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
446                    included_leaves.push(leaf_idx);
447                }
448            }
449        }
450
451        if included_leaves.is_empty() {
452            None
453        } else {
454            Some(ProjectionMask::leaves(schema, included_leaves))
455        }
456    }
457}
458
459/// Lookups up the parquet column by name
460///
461/// Returns the parquet column index and the corresponding arrow field
462pub fn parquet_column<'a>(
463    parquet_schema: &SchemaDescriptor,
464    arrow_schema: &'a Schema,
465    name: &str,
466) -> Option<(usize, &'a FieldRef)> {
467    let (root_idx, field) = arrow_schema.fields.find(name)?;
468    if field.data_type().is_nested() {
469        // Nested fields are not supported and require non-trivial logic
470        // to correctly walk the parquet schema accounting for the
471        // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
472        //
473        // For example a ListArray could correspond to anything from 1 to 3 levels
474        // in the parquet schema
475        return None;
476    }
477
478    // This could be made more efficient (#TBD)
479    let parquet_idx = (0..parquet_schema.columns().len())
480        .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
481    Some((parquet_idx, field))
482}
483
484#[cfg(test)]
485mod test {
486    use crate::arrow::ArrowWriter;
487    use crate::file::metadata::{
488        PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
489        ParquetMetaDataWriter,
490    };
491    use crate::file::properties::{EnabledStatistics, WriterProperties};
492    use crate::schema::parser::parse_message_type;
493    use crate::schema::types::SchemaDescriptor;
494    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
495    use bytes::Bytes;
496    use std::sync::Arc;
497
498    use super::ProjectionMask;
499
500    #[test]
501    // Reproducer for https://github.com/apache/arrow-rs/issues/6464
502    fn test_metadata_read_write_partial_offset() {
503        let parquet_bytes = create_parquet_file();
504
505        // read the metadata from the file WITHOUT the page index structures
506        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
507        let original_metadata = ParquetMetaDataReader::new()
508            .with_metadata_options(Some(options))
509            .parse_and_finish(&parquet_bytes)
510            .unwrap();
511
512        // this should error because the page indexes are not present, but have offsets specified
513        let metadata_bytes = metadata_to_bytes(&original_metadata);
514        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
515        let err = ParquetMetaDataReader::new()
516            .with_metadata_options(Some(options))
517            .with_page_index_policy(PageIndexPolicy::Required) // there are no page indexes in the metadata
518            .parse_and_finish(&metadata_bytes)
519            .err()
520            .unwrap();
521        assert_eq!(
522            err.to_string(),
523            "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357"
524        );
525    }
526
527    #[test]
528    fn test_metadata_read_write_roundtrip() {
529        let parquet_bytes = create_parquet_file();
530
531        // read the metadata from the file
532        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
533        let original_metadata = ParquetMetaDataReader::new()
534            .with_metadata_options(Some(options))
535            .parse_and_finish(&parquet_bytes)
536            .unwrap();
537
538        // read metadata back from the serialized bytes and ensure it is the same
539        let metadata_bytes = metadata_to_bytes(&original_metadata);
540        assert_ne!(
541            metadata_bytes.len(),
542            parquet_bytes.len(),
543            "metadata is subset of parquet"
544        );
545
546        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
547        let roundtrip_metadata = ParquetMetaDataReader::new()
548            .with_metadata_options(Some(options))
549            .parse_and_finish(&metadata_bytes)
550            .unwrap();
551
552        assert_eq!(original_metadata, roundtrip_metadata);
553    }
554
555    #[test]
556    fn test_metadata_read_write_roundtrip_page_index() {
557        let parquet_bytes = create_parquet_file();
558
559        // read the metadata from the file including the page index structures
560        // (which are stored elsewhere in the footer)
561        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
562        let original_metadata = ParquetMetaDataReader::new()
563            .with_metadata_options(Some(options))
564            .with_page_index_policy(PageIndexPolicy::Required)
565            .parse_and_finish(&parquet_bytes)
566            .unwrap();
567
568        // read metadata back from the serialized bytes and ensure it is the same
569        let metadata_bytes = metadata_to_bytes(&original_metadata);
570        let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
571        let roundtrip_metadata = ParquetMetaDataReader::new()
572            .with_metadata_options(Some(options))
573            .with_page_index_policy(PageIndexPolicy::Required)
574            .parse_and_finish(&metadata_bytes)
575            .unwrap();
576
577        // Need to normalize the metadata first to remove offsets in data
578        let original_metadata = normalize_locations(original_metadata);
579        let roundtrip_metadata = normalize_locations(roundtrip_metadata);
580        assert_eq!(
581            format!("{original_metadata:#?}"),
582            format!("{roundtrip_metadata:#?}")
583        );
584        assert_eq!(original_metadata, roundtrip_metadata);
585    }
586
587    /// Sets the page index offset locations in the metadata to `None`
588    ///
589    /// This is because the offsets are used to find the relative location of the index
590    /// structures, and thus differ depending on how the structures are stored.
591    fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
592        let mut metadata_builder = metadata.into_builder();
593        for rg in metadata_builder.take_row_groups() {
594            let mut rg_builder = rg.into_builder();
595            for col in rg_builder.take_columns() {
596                rg_builder = rg_builder.add_column_metadata(
597                    col.into_builder()
598                        .set_offset_index_offset(None)
599                        .set_index_page_offset(None)
600                        .set_column_index_offset(None)
601                        .build()
602                        .unwrap(),
603                );
604            }
605            let rg = rg_builder.build().unwrap();
606            metadata_builder = metadata_builder.add_row_group(rg);
607        }
608        metadata_builder.build()
609    }
610
611    /// Write a parquet filed into an in memory buffer
612    fn create_parquet_file() -> Bytes {
613        let mut buf = vec![];
614        let data = vec![100, 200, 201, 300, 102, 33];
615        let array: ArrayRef = Arc::new(Int32Array::from(data));
616        let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
617        let props = WriterProperties::builder()
618            .set_statistics_enabled(EnabledStatistics::Page)
619            .set_write_page_header_statistics(true)
620            .build();
621
622        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
623        writer.write(&batch).unwrap();
624        writer.finish().unwrap();
625        drop(writer);
626
627        Bytes::from(buf)
628    }
629
630    /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
631    fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
632        let mut buf = vec![];
633        ParquetMetaDataWriter::new(&mut buf, metadata)
634            .finish()
635            .unwrap();
636        Bytes::from(buf)
637    }
638
639    #[test]
640    fn test_mask_from_column_names() {
641        let schema = parse_schema(
642            "
643            message test_schema {
644                OPTIONAL group a (MAP) {
645                    REPEATED group key_value {
646                        REQUIRED BYTE_ARRAY key (UTF8);
647                        OPTIONAL group value (MAP) {
648                            REPEATED group key_value {
649                                REQUIRED INT32 key;
650                                REQUIRED BOOLEAN value;
651                            }
652                        }
653                    }
654                }
655                REQUIRED INT32 b;
656                REQUIRED DOUBLE c;
657            }
658            ",
659        );
660
661        let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
662        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
663
664        let mask = ProjectionMask::columns(&schema, []);
665        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
666
667        let mask = ProjectionMask::columns(&schema, ["a", "c"]);
668        assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
669
670        let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
671        assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
672
673        let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
674        assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
675
676        let schema = parse_schema(
677            "
678            message test_schema {
679                OPTIONAL group a (LIST) {
680                    REPEATED group list {
681                        OPTIONAL group element (LIST) {
682                            REPEATED group list {
683                                OPTIONAL group element (LIST) {
684                                    REPEATED group list {
685                                        OPTIONAL BYTE_ARRAY element (UTF8);
686                                    }
687                                }
688                            }
689                        }
690                    }
691                }
692                REQUIRED INT32 b;
693            }
694            ",
695        );
696
697        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
698        assert_eq!(mask.mask.unwrap(), [true, true]);
699
700        let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
701        assert_eq!(mask.mask.unwrap(), [true, true]);
702
703        let mask =
704            ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
705        assert_eq!(mask.mask.unwrap(), [true, true]);
706
707        let mask = ProjectionMask::columns(&schema, ["b"]);
708        assert_eq!(mask.mask.unwrap(), [false, true]);
709
710        let schema = parse_schema(
711            "
712            message test_schema {
713                OPTIONAL INT32 a;
714                OPTIONAL INT32 b;
715                OPTIONAL INT32 c;
716                OPTIONAL INT32 d;
717                OPTIONAL INT32 e;
718            }
719            ",
720        );
721
722        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
723        assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
724
725        let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
726        assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
727
728        let schema = parse_schema(
729            "
730            message test_schema {
731                OPTIONAL INT32 a;
732                OPTIONAL INT32 b;
733                OPTIONAL INT32 a;
734                OPTIONAL INT32 d;
735                OPTIONAL INT32 e;
736            }
737            ",
738        );
739
740        let mask = ProjectionMask::columns(&schema, ["a", "e"]);
741        assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
742
743        let schema = parse_schema(
744            "
745            message test_schema {
746                OPTIONAL INT32 a;
747                OPTIONAL INT32 aa;
748            }
749            ",
750        );
751
752        let mask = ProjectionMask::columns(&schema, ["a"]);
753        assert_eq!(mask.mask.unwrap(), [true, false]);
754    }
755
756    #[test]
757    fn test_projection_mask_union() {
758        let mut mask1 = ProjectionMask {
759            mask: Some(vec![true, false, true]),
760        };
761        let mask2 = ProjectionMask {
762            mask: Some(vec![false, true, true]),
763        };
764        mask1.union(&mask2);
765        assert_eq!(mask1.mask, Some(vec![true, true, true]));
766
767        let mut mask1 = ProjectionMask { mask: None };
768        let mask2 = ProjectionMask {
769            mask: Some(vec![false, true, true]),
770        };
771        mask1.union(&mask2);
772        assert_eq!(mask1.mask, None);
773
774        let mut mask1 = ProjectionMask {
775            mask: Some(vec![true, false, true]),
776        };
777        let mask2 = ProjectionMask { mask: None };
778        mask1.union(&mask2);
779        assert_eq!(mask1.mask, None);
780
781        let mut mask1 = ProjectionMask { mask: None };
782        let mask2 = ProjectionMask { mask: None };
783        mask1.union(&mask2);
784        assert_eq!(mask1.mask, None);
785    }
786
787    #[test]
788    fn test_projection_mask_intersect() {
789        let mut mask1 = ProjectionMask {
790            mask: Some(vec![true, false, true]),
791        };
792        let mask2 = ProjectionMask {
793            mask: Some(vec![false, true, true]),
794        };
795        mask1.intersect(&mask2);
796        assert_eq!(mask1.mask, Some(vec![false, false, true]));
797
798        let mut mask1 = ProjectionMask { mask: None };
799        let mask2 = ProjectionMask {
800            mask: Some(vec![false, true, true]),
801        };
802        mask1.intersect(&mask2);
803        assert_eq!(mask1.mask, Some(vec![false, true, true]));
804
805        let mut mask1 = ProjectionMask {
806            mask: Some(vec![true, false, true]),
807        };
808        let mask2 = ProjectionMask { mask: None };
809        mask1.intersect(&mask2);
810        assert_eq!(mask1.mask, Some(vec![true, false, true]));
811
812        let mut mask1 = ProjectionMask { mask: None };
813        let mask2 = ProjectionMask { mask: None };
814        mask1.intersect(&mask2);
815        assert_eq!(mask1.mask, None);
816    }
817
818    #[test]
819    fn test_projection_mask_without_nested_no_nested() {
820        // Schema with no nested types
821        let schema = parse_schema(
822            "
823            message test_schema {
824                OPTIONAL INT32 a;
825                OPTIONAL INT32 b;
826                REQUIRED DOUBLE d;
827            }
828            ",
829        );
830
831        let mask = ProjectionMask::all();
832        // All columns are non-nested, but without_nested_types returns a new mask
833        assert_eq!(
834            Some(ProjectionMask::leaves(&schema, [0, 1, 2])),
835            mask.without_nested_types(&schema)
836        );
837
838        // select b, c
839        let mask = ProjectionMask::leaves(&schema, [1, 2]);
840        assert_eq!(Some(mask.clone()), mask.without_nested_types(&schema));
841    }
842
843    #[test]
844    fn test_projection_mask_without_nested_nested() {
845        // Schema with nested types (structs)
846        let schema = parse_schema(
847            "
848            message test_schema {
849                OPTIONAL INT32 a;
850                OPTIONAL group b {
851                    REQUIRED INT32 b1;
852                    OPTIONAL INT64 b2;
853                }
854                OPTIONAL group c (LIST) {
855                    REPEATED group list {
856                        OPTIONAL INT32 element;
857                    }
858                }
859                REQUIRED DOUBLE d;
860            }
861            ",
862        );
863
864        // all leaves --> a, d
865        let mask = ProjectionMask::all();
866        assert_eq!(
867            Some(ProjectionMask::leaves(&schema, [0, 4])),
868            mask.without_nested_types(&schema)
869        );
870
871        // b1 --> empty (it is nested)
872        let mask = ProjectionMask::leaves(&schema, [1]);
873        assert_eq!(None, mask.without_nested_types(&schema));
874
875        // b2, d --> d
876        let mask = ProjectionMask::leaves(&schema, [1, 4]);
877        assert_eq!(
878            Some(ProjectionMask::leaves(&schema, [4])),
879            mask.without_nested_types(&schema)
880        );
881
882        // element --> empty (it is nested)
883        let mask = ProjectionMask::leaves(&schema, [3]);
884        assert_eq!(None, mask.without_nested_types(&schema));
885    }
886
887    #[test]
888    fn test_projection_mask_without_nested_map_only() {
889        // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
890        let schema = parse_schema(
891            "
892            message test_schema {
893                required group my_map (MAP) {
894                    repeated group key_value {
895                        required binary key (STRING);
896                        optional int32 value;
897                    }
898                }
899            }
900            ",
901        );
902
903        let mask = ProjectionMask::all();
904        assert_eq!(None, mask.without_nested_types(&schema));
905
906        // key --> empty (it is nested)
907        let mask = ProjectionMask::leaves(&schema, [0]);
908        assert_eq!(None, mask.without_nested_types(&schema));
909
910        // value --> empty (it is nested)
911        let mask = ProjectionMask::leaves(&schema, [1]);
912        assert_eq!(None, mask.without_nested_types(&schema));
913    }
914
915    #[test]
916    fn test_projection_mask_without_nested_map_with_non_nested() {
917        // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
918        // with an additional non-nested field
919        let schema = parse_schema(
920            "
921            message test_schema {
922                REQUIRED INT32 a;
923                required group my_map (MAP) {
924                    repeated group key_value {
925                        required binary key (STRING);
926                        optional int32 value;
927                    }
928                }
929                REQUIRED INT32 b;
930            }
931            ",
932        );
933
934        // all leaves --> a, b which are the only non nested ones
935        let mask = ProjectionMask::all();
936        assert_eq!(
937            Some(ProjectionMask::leaves(&schema, [0, 3])),
938            mask.without_nested_types(&schema)
939        );
940
941        // key, value, b --> b (the only non-nested one)
942        let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
943        assert_eq!(
944            Some(ProjectionMask::leaves(&schema, [3])),
945            mask.without_nested_types(&schema)
946        );
947
948        // key, value --> NONE
949        let mask = ProjectionMask::leaves(&schema, [1, 2]);
950        assert_eq!(None, mask.without_nested_types(&schema));
951    }
952
953    #[test]
954    fn test_projection_mask_without_nested_deeply_nested() {
955        // Map of Maps
956        let schema = parse_schema(
957            "
958            message test_schema {
959                OPTIONAL group a (MAP) {
960                    REPEATED group key_value {
961                        REQUIRED BYTE_ARRAY key (UTF8);
962                        OPTIONAL group value (MAP) {
963                            REPEATED group key_value {
964                                REQUIRED INT32 key;
965                                REQUIRED BOOLEAN value;
966                            }
967                        }
968                    }
969                }
970                REQUIRED INT32 b;
971                REQUIRED DOUBLE c;
972            ",
973        );
974
975        let mask = ProjectionMask::all();
976        assert_eq!(
977            Some(ProjectionMask::leaves(&schema, [3, 4])),
978            mask.without_nested_types(&schema)
979        );
980
981        // (first) key, c --> c (the only non-nested one)
982        let mask = ProjectionMask::leaves(&schema, [0, 4]);
983        assert_eq!(
984            Some(ProjectionMask::leaves(&schema, [4])),
985            mask.without_nested_types(&schema)
986        );
987
988        // (second) key, value, b --> b (the only non-nested one)
989        let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
990        assert_eq!(
991            Some(ProjectionMask::leaves(&schema, [3])),
992            mask.without_nested_types(&schema)
993        );
994
995        // key --> NONE (the only non-nested one)
996        let mask = ProjectionMask::leaves(&schema, [0]);
997        assert_eq!(None, mask.without_nested_types(&schema));
998    }
999
1000    #[test]
1001    fn test_projection_mask_without_nested_list() {
1002        // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
1003        let schema = parse_schema(
1004            "
1005            message test_schema {
1006                required group my_list (LIST) {
1007                    repeated group list {
1008                        optional binary element (STRING);
1009                    }
1010                }
1011                REQUIRED INT32 b;
1012            }
1013            ",
1014        );
1015
1016        let mask = ProjectionMask::all();
1017        assert_eq!(
1018            Some(ProjectionMask::leaves(&schema, [1])),
1019            mask.without_nested_types(&schema),
1020        );
1021
1022        // element --> empty (it is nested)
1023        let mask = ProjectionMask::leaves(&schema, [0]);
1024        assert_eq!(None, mask.without_nested_types(&schema));
1025
1026        // element, b --> b (it is nested)
1027        let mask = ProjectionMask::leaves(&schema, [0, 1]);
1028        assert_eq!(
1029            Some(ProjectionMask::leaves(&schema, [1])),
1030            mask.without_nested_types(&schema),
1031        );
1032    }
1033
1034    #[test]
1035    fn test_projection_mask_without_nested_single_leaf_struct() {
1036        // Regression: a single-leaf struct is still nested.
1037        let schema = parse_schema(
1038            "
1039            message test_schema {
1040                OPTIONAL group address {
1041                    REQUIRED BYTE_ARRAY street (UTF8);
1042                }
1043                REQUIRED INT32 id;
1044            }
1045            ",
1046        );
1047
1048        // street -> empty; root is a struct
1049        let mask = ProjectionMask::leaves(&schema, [0]);
1050        assert_eq!(None, mask.without_nested_types(&schema));
1051
1052        // street, id --> id only
1053        let mask = ProjectionMask::leaves(&schema, [0, 1]);
1054        assert_eq!(
1055            Some(ProjectionMask::leaves(&schema, [1])),
1056            mask.without_nested_types(&schema)
1057        );
1058
1059        // all --> id only
1060        let mask = ProjectionMask::all();
1061        assert_eq!(
1062            Some(ProjectionMask::leaves(&schema, [1])),
1063            mask.without_nested_types(&schema)
1064        );
1065    }
1066
1067    /// Converts a schema string into a `SchemaDescriptor`
1068    fn parse_schema(schema: &str) -> SchemaDescriptor {
1069        let parquet_group_type = parse_message_type(schema).unwrap();
1070        SchemaDescriptor::new(Arc::new(parquet_group_type))
1071    }
1072}