parquet/arrow/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! API for reading/writing Arrow [`RecordBatch`]es and [`Array`]s to/from
19//! Parquet Files.
20//!
21//! See the [crate-level documentation](crate) for more details on other APIs
22//!
23//! # Schema Conversion
24//!
25//! These APIs ensure that data in Arrow [`RecordBatch`]es written to Parquet are
26//! read back as [`RecordBatch`]es with the exact same types and values.
27//!
28//! Parquet and Arrow have different type systems, and there is not
29//! always a one to one mapping between the systems. For example, data
30//! stored as a Parquet [`BYTE_ARRAY`] can be read as either an Arrow
31//! [`BinaryViewArray`] or [`BinaryArray`].
32//!
33//! To recover the original Arrow types, the writers in this module add a "hint" to
34//! the metadata in the [`ARROW_SCHEMA_META_KEY`] key which records the original Arrow
35//! schema. The metadata hint follows the same convention as arrow-cpp based
36//! implementations such as `pyarrow`. The reader looks for the schema hint in the
37//! metadata to determine Arrow types, and if it is not present, infers the Arrow schema
38//! from the Parquet schema.
39//!
40//! In situations where the embedded Arrow schema is not compatible with the Parquet
41//! schema, the Parquet schema takes precedence and no error is raised.
42//! See [#1663](https://github.com/apache/arrow-rs/issues/1663)
43//!
44//! You can also control the type conversion process in more detail using:
45//!
46//! * [`ArrowSchemaConverter`] control the conversion of Arrow types to Parquet
47//!   types.
48//!
49//! * [`ArrowReaderOptions::with_schema`] to explicitly specify your own Arrow schema hint
50//!   to use when reading Parquet, overriding any metadata that may be present.
51//!
52//! [`RecordBatch`]: arrow_array::RecordBatch
53//! [`Array`]: arrow_array::Array
54//! [`BYTE_ARRAY`]: crate::basic::Type::BYTE_ARRAY
55//! [`BinaryViewArray`]: arrow_array::BinaryViewArray
56//! [`BinaryArray`]: arrow_array::BinaryArray
57//! [`ArrowReaderOptions::with_schema`]: arrow_reader::ArrowReaderOptions::with_schema
58//!
59//! # Example: Writing Arrow `RecordBatch` to Parquet file
60//!
61//!```rust
62//! # use arrow_array::{Int32Array, ArrayRef};
63//! # use arrow_array::RecordBatch;
64//! # use parquet::arrow::arrow_writer::ArrowWriter;
65//! # use parquet::file::properties::WriterProperties;
66//! # use tempfile::tempfile;
67//! # use std::sync::Arc;
68//! # use parquet::basic::Compression;
69//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
70//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
71//! let batch = RecordBatch::try_from_iter(vec![
72//!   ("id", Arc::new(ids) as ArrayRef),
73//!   ("val", Arc::new(vals) as ArrayRef),
74//! ]).unwrap();
75//!
76//! let file = tempfile().unwrap();
77//!
78//! // WriterProperties can be used to set Parquet file options
79//! let props = WriterProperties::builder()
80//!     .set_compression(Compression::SNAPPY)
81//!     .build();
82//!
83//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
84//!
85//! writer.write(&batch).expect("Writing batch");
86//!
87//! // writer must be closed to write footer
88//! writer.close().unwrap();
89//! ```
90//!
91//! # Example: Reading Parquet file into Arrow `RecordBatch`
92//!
93//! ```rust
94//! # use std::fs::File;
95//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
96//! # use std::sync::Arc;
97//! # use arrow_array::Int32Array;
98//! # use arrow::datatypes::{DataType, Field, Schema};
99//! # use arrow_array::RecordBatch;
100//! # use parquet::arrow::arrow_writer::ArrowWriter;
101//! #
102//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
103//! # let schema = Arc::new(Schema::new(vec![
104//! #     Field::new("id", DataType::Int32, false),
105//! # ]));
106//! #
107//! # let file = File::create("data.parquet").unwrap();
108//! #
109//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
110//! # let batches = vec![batch];
111//! #
112//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
113//! #
114//! # for batch in batches {
115//! #     writer.write(&batch).expect("Writing batch");
116//! # }
117//! # writer.close().unwrap();
118//! #
119//! let file = File::open("data.parquet").unwrap();
120//!
121//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
122//! println!("Converted arrow schema is: {}", builder.schema());
123//!
124//! let mut reader = builder.build().unwrap();
125//!
126//! let record_batch = reader.next().unwrap().unwrap();
127//!
128//! println!("Read {} records.", record_batch.num_rows());
129//! ```
130//!
131//! # Example: Reading non-uniformly encrypted parquet file into arrow record batch
132//!
133//! Note: This requires the experimental `encryption` feature to be enabled at compile time.
134//!
135#![cfg_attr(feature = "encryption", doc = "```rust")]
136#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
137//! # use arrow_array::{Int32Array, ArrayRef};
138//! # use arrow_array::{types, RecordBatch};
139//! # use parquet::arrow::arrow_reader::{
140//! #     ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
141//! # };
142//! # use arrow_array::cast::AsArray;
143//! # use parquet::file::metadata::ParquetMetaData;
144//! # use tempfile::tempfile;
145//! # use std::fs::File;
146//! # use parquet::encryption::decrypt::FileDecryptionProperties;
147//! # let test_data = arrow::util::test_util::parquet_test_data();
148//! # let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
149//! #
150//! let file = File::open(path).unwrap();
151//!
152//! // Define the AES encryption keys required required for decrypting the footer metadata
153//! // and column-specific data. If only a footer key is used then it is assumed that the
154//! // file uses uniform encryption and all columns are encrypted with the footer key.
155//! // If any column keys are specified, other columns without a key provided are assumed
156//! // to be unencrypted
157//! let footer_key = "0123456789012345".as_bytes(); // Keys are 128 bits (16 bytes)
158//! let column_1_key = "1234567890123450".as_bytes();
159//! let column_2_key = "1234567890123451".as_bytes();
160//!
161//! let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
162//!     .with_column_key("double_field", column_1_key.to_vec())
163//!     .with_column_key("float_field", column_2_key.to_vec())
164//!     .build()
165//!     .unwrap();
166//!
167//! let options = ArrowReaderOptions::default()
168//!  .with_file_decryption_properties(decryption_properties);
169//! let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
170//! let file_metadata = reader_metadata.metadata().file_metadata();
171//! assert_eq!(50, file_metadata.num_rows());
172//!
173//! let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
174//!   .unwrap()
175//!   .build()
176//!   .unwrap();
177//!
178//! let record_batch = reader.next().unwrap().unwrap();
179//! assert_eq!(50, record_batch.num_rows());
180//! ```
181
182experimental!(mod array_reader);
183pub mod arrow_reader;
184pub mod arrow_writer;
185mod buffer;
186mod decoder;
187
188#[cfg(feature = "async")]
189pub mod async_reader;
190#[cfg(feature = "async")]
191pub mod async_writer;
192
193mod record_reader;
194experimental!(mod schema);
195
196pub use self::arrow_writer::ArrowWriter;
197#[cfg(feature = "async")]
198pub use self::async_reader::ParquetRecordBatchStreamBuilder;
199#[cfg(feature = "async")]
200pub use self::async_writer::AsyncArrowWriter;
201use crate::schema::types::SchemaDescriptor;
202use arrow_schema::{FieldRef, Schema};
203
204pub use self::schema::{
205    ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema,
206    parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
207};
208
209/// Schema metadata key used to store serialized Arrow schema
210///
211/// The Arrow schema is encoded using the Arrow IPC format, and then base64
212/// encoded. This is the same format used by arrow-cpp systems, such as pyarrow.
213pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
214
215/// The value of this metadata key, if present on [`Field::metadata`], will be used
216/// to populate [`BasicTypeInfo::id`]
217///
218/// [`Field::metadata`]: arrow_schema::Field::metadata
219/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
220pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
221
222/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
223///
224/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
225/// or root column indices where:
226///
227/// * Root columns are the direct children of the root schema, enumerated in order
228/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
229///
230/// For example, the schema
231///
232/// ```ignore
233/// message schema {
234///   REQUIRED boolean         leaf_1;
235///   REQUIRED GROUP group {
236///     OPTIONAL int32 leaf_2;
237///     OPTIONAL int64 leaf_3;
238///   }
239/// }
240/// ```
241///
242/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
243///
244/// For non-nested schemas, i.e. those containing only primitive columns, the root
245/// and leaves are the same
246///
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct ProjectionMask {
249    /// If `Some`, a leaf column should be included if the value at
250    /// the corresponding index is true
251    ///
252    /// If `None`, all columns should be included
253    ///
254    /// # Examples
255    ///
256    /// Given the original parquet schema with leaf columns is `[a, b, c, d]`
257    ///
258    /// A mask of `[true, false, true, false]` will result in a schema 2
259    /// elements long:
260    /// * `fields[0]`: `a`
261    /// * `fields[1]`: `c`    
262    ///
263    /// A mask of `None` will result in a schema 4 elements long:
264    /// * `fields[0]`: `a`
265    /// * `fields[1]`: `b`
266    /// * `fields[2]`: `c`
267    /// * `fields[3]`: `d`
268    mask: Option<Vec<bool>>,
269}
270
271impl ProjectionMask {
272    /// Create a [`ProjectionMask`] which selects all columns
273    pub fn all() -> Self {
274        Self { mask: None }
275    }
276
277    /// Create a [`ProjectionMask`] which selects no columns
278    pub fn none(len: usize) -> Self {
279        Self {
280            mask: Some(vec![false; len]),
281        }
282    }
283
284    /// Create a [`ProjectionMask`] which selects only the specified leaf columns
285    ///
286    /// Note: repeated or out of order indices will not impact the final mask
287    ///
288    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
289    pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
290        let mut mask = vec![false; schema.num_columns()];
291        for leaf_idx in indices {
292            mask[leaf_idx] = true;
293        }
294        Self { mask: Some(mask) }
295    }
296
297    /// Create a [`ProjectionMask`] which selects only the specified root columns
298    ///
299    /// Note: repeated or out of order indices will not impact the final mask
300    ///
301    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
302    pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
303        let num_root_columns = schema.root_schema().get_fields().len();
304        let mut root_mask = vec![false; num_root_columns];
305        for root_idx in indices {
306            root_mask[root_idx] = true;
307        }
308
309        let mask = (0..schema.num_columns())
310            .map(|leaf_idx| {
311                let root_idx = schema.get_column_root_idx(leaf_idx);
312                root_mask[root_idx]
313            })
314            .collect();
315
316        Self { mask: Some(mask) }
317    }
318
319    /// Create a [`ProjectionMask`] which selects only the named columns
320    ///
321    /// All leaf columns that fall below a given name will be selected. For example, given
322    /// the schema
323    /// ```ignore
324    /// message schema {
325    ///   OPTIONAL group a (MAP) {
326    ///     REPEATED group key_value {
327    ///       REQUIRED BYTE_ARRAY key (UTF8);  // leaf index 0
328    ///       OPTIONAL group value (MAP) {
329    ///         REPEATED group key_value {
330    ///           REQUIRED INT32 key;          // leaf index 1
331    ///           REQUIRED BOOLEAN value;      // leaf index 2
332    ///         }
333    ///       }
334    ///     }
335    ///   }
336    ///   REQUIRED INT32 b;                    // leaf index 3
337    ///   REQUIRED DOUBLE c;                   // leaf index 4
338    /// }
339    /// ```
340    /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return
341    /// columns 0, 1, and 2.
342    ///
343    /// Note: repeated or out of order indices will not impact the final mask.
344    ///
345    /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`.
346    ///
347    /// Also, this will not produce the desired results if a column contains a '.' in its name.
348    /// Use [`Self::leaves`] or [`Self::roots`] in that case.
349    pub fn columns<'a>(
350        schema: &SchemaDescriptor,
351        names: impl IntoIterator<Item = &'a str>,
352    ) -> Self {
353        let mut mask = vec![false; schema.num_columns()];
354        for name in names {
355            let name_path: Vec<&str> = name.split('.').collect();
356            for (idx, col) in schema.columns().iter().enumerate() {
357                let path = col.path().parts();
358                // searching for "a.b.c" cannot match "a.b"
359                if name_path.len() > path.len() {
360                    continue;
361                }
362                // now path >= name_path, so check that each element in name_path matches
363                if name_path.iter().zip(path.iter()).all(|(a, b)| a == b) {
364                    mask[idx] = true;
365                }
366            }
367        }
368
369        Self { mask: Some(mask) }
370    }
371
372    /// Returns true if the leaf column `leaf_idx` is included by the mask
373    pub fn leaf_included(&self, leaf_idx: usize) -> bool {
374        self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
375    }
376
377    /// Union two projection masks
378    ///
379    /// Example:
380    /// ```text
381    /// mask1 = [true, false, true]
382    /// mask2 = [false, true, true]
383    /// union(mask1, mask2) = [true, true, true]
384    /// ```
385    pub fn union(&mut self, other: &Self) {
386        match (self.mask.as_ref(), other.mask.as_ref()) {
387            (None, _) | (_, None) => self.mask = None,
388            (Some(a), Some(b)) => {
389                debug_assert_eq!(a.len(), b.len());
390                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
391                self.mask = Some(mask);
392            }
393        }
394    }
395
396    /// Intersect two projection masks
397    ///
398    /// Example:
399    /// ```text
400    /// mask1 = [true, false, true]
401    /// mask2 = [false, true, true]
402    /// intersect(mask1, mask2) = [false, false, true]
403    /// ```
404    pub fn intersect(&mut self, other: &Self) {
405        match (self.mask.as_ref(), other.mask.as_ref()) {
406            (None, _) => self.mask = other.mask.clone(),
407            (_, None) => {}
408            (Some(a), Some(b)) => {
409                debug_assert_eq!(a.len(), b.len());
410                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
411                self.mask = Some(mask);
412            }
413        }
414    }
415}
416
417/// Lookups up the parquet column by name
418///
419/// Returns the parquet column index and the corresponding arrow field
420pub fn parquet_column<'a>(
421    parquet_schema: &SchemaDescriptor,
422    arrow_schema: &'a Schema,
423    name: &str,
424) -> Option<(usize, &'a FieldRef)> {
425    let (root_idx, field) = arrow_schema.fields.find(name)?;
426    if field.data_type().is_nested() {
427        // Nested fields are not supported and require non-trivial logic
428        // to correctly walk the parquet schema accounting for the
429        // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
430        //
431        // For example a ListArray could correspond to anything from 1 to 3 levels
432        // in the parquet schema
433        return None;
434    }
435
436    // This could be made more efficient (#TBD)
437    let parquet_idx = (0..parquet_schema.columns().len())
438        .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
439    Some((parquet_idx, field))
440}
441
442#[cfg(test)]
443mod test {
444    use crate::arrow::ArrowWriter;
445    use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
446    use crate::file::properties::{EnabledStatistics, WriterProperties};
447    use crate::schema::parser::parse_message_type;
448    use crate::schema::types::SchemaDescriptor;
449    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
450    use bytes::Bytes;
451    use std::sync::Arc;
452
453    use super::ProjectionMask;
454
455    #[test]
456    #[allow(deprecated)]
457    // Reproducer for https://github.com/apache/arrow-rs/issues/6464
458    fn test_metadata_read_write_partial_offset() {
459        let parquet_bytes = create_parquet_file();
460
461        // read the metadata from the file WITHOUT the page index structures
462        let original_metadata = ParquetMetaDataReader::new()
463            .parse_and_finish(&parquet_bytes)
464            .unwrap();
465
466        // this should error because the page indexes are not present, but have offsets specified
467        let metadata_bytes = metadata_to_bytes(&original_metadata);
468        let err = ParquetMetaDataReader::new()
469            .with_page_indexes(true) // there are no page indexes in the metadata
470            .parse_and_finish(&metadata_bytes)
471            .err()
472            .unwrap();
473        assert_eq!(
474            err.to_string(),
475            "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357"
476        );
477    }
478
479    #[test]
480    fn test_metadata_read_write_roundtrip() {
481        let parquet_bytes = create_parquet_file();
482
483        // read the metadata from the file
484        let original_metadata = ParquetMetaDataReader::new()
485            .parse_and_finish(&parquet_bytes)
486            .unwrap();
487
488        // read metadata back from the serialized bytes and ensure it is the same
489        let metadata_bytes = metadata_to_bytes(&original_metadata);
490        assert_ne!(
491            metadata_bytes.len(),
492            parquet_bytes.len(),
493            "metadata is subset of parquet"
494        );
495
496        let roundtrip_metadata = ParquetMetaDataReader::new()
497            .parse_and_finish(&metadata_bytes)
498            .unwrap();
499
500        assert_eq!(original_metadata, roundtrip_metadata);
501    }
502
503    #[test]
504    #[allow(deprecated)]
505    fn test_metadata_read_write_roundtrip_page_index() {
506        let parquet_bytes = create_parquet_file();
507
508        // read the metadata from the file including the page index structures
509        // (which are stored elsewhere in the footer)
510        let original_metadata = ParquetMetaDataReader::new()
511            .with_page_indexes(true)
512            .parse_and_finish(&parquet_bytes)
513            .unwrap();
514
515        // read metadata back from the serialized bytes and ensure it is the same
516        let metadata_bytes = metadata_to_bytes(&original_metadata);
517        let roundtrip_metadata = ParquetMetaDataReader::new()
518            .with_page_indexes(true)
519            .parse_and_finish(&metadata_bytes)
520            .unwrap();
521
522        // Need to normalize the metadata first to remove offsets in data
523        let original_metadata = normalize_locations(original_metadata);
524        let roundtrip_metadata = normalize_locations(roundtrip_metadata);
525        assert_eq!(
526            format!("{original_metadata:#?}"),
527            format!("{roundtrip_metadata:#?}")
528        );
529        assert_eq!(original_metadata, roundtrip_metadata);
530    }
531
532    /// Sets the page index offset locations in the metadata to `None`
533    ///
534    /// This is because the offsets are used to find the relative location of the index
535    /// structures, and thus differ depending on how the structures are stored.
536    fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
537        let mut metadata_builder = metadata.into_builder();
538        for rg in metadata_builder.take_row_groups() {
539            let mut rg_builder = rg.into_builder();
540            for col in rg_builder.take_columns() {
541                rg_builder = rg_builder.add_column_metadata(
542                    col.into_builder()
543                        .set_offset_index_offset(None)
544                        .set_index_page_offset(None)
545                        .set_column_index_offset(None)
546                        .build()
547                        .unwrap(),
548                );
549            }
550            let rg = rg_builder.build().unwrap();
551            metadata_builder = metadata_builder.add_row_group(rg);
552        }
553        metadata_builder.build()
554    }
555
556    /// Write a parquet filed into an in memory buffer
557    fn create_parquet_file() -> Bytes {
558        let mut buf = vec![];
559        let data = vec![100, 200, 201, 300, 102, 33];
560        let array: ArrayRef = Arc::new(Int32Array::from(data));
561        let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
562        let props = WriterProperties::builder()
563            .set_statistics_enabled(EnabledStatistics::Page)
564            .set_write_page_header_statistics(true)
565            .build();
566
567        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
568        writer.write(&batch).unwrap();
569        writer.finish().unwrap();
570        drop(writer);
571
572        Bytes::from(buf)
573    }
574
575    /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
576    fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
577        let mut buf = vec![];
578        ParquetMetaDataWriter::new(&mut buf, metadata)
579            .finish()
580            .unwrap();
581        Bytes::from(buf)
582    }
583
584    #[test]
585    fn test_mask_from_column_names() {
586        let message_type = "
587            message test_schema {
588                OPTIONAL group a (MAP) {
589                    REPEATED group key_value {
590                        REQUIRED BYTE_ARRAY key (UTF8);
591                        OPTIONAL group value (MAP) {
592                            REPEATED group key_value {
593                                REQUIRED INT32 key;
594                                REQUIRED BOOLEAN value;
595                            }
596                        }
597                    }
598                }
599                REQUIRED INT32 b;
600                REQUIRED DOUBLE c;
601            }
602            ";
603        let parquet_group_type = parse_message_type(message_type).unwrap();
604        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
605
606        let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
607        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
608
609        let mask = ProjectionMask::columns(&schema, []);
610        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
611
612        let mask = ProjectionMask::columns(&schema, ["a", "c"]);
613        assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
614
615        let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
616        assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
617
618        let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
619        assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
620
621        let message_type = "
622            message test_schema {
623                OPTIONAL group a (LIST) {
624                    REPEATED group list {
625                        OPTIONAL group element (LIST) {
626                            REPEATED group list {
627                                OPTIONAL group element (LIST) {
628                                    REPEATED group list {
629                                        OPTIONAL BYTE_ARRAY element (UTF8);
630                                    }
631                                }
632                            }
633                        }
634                    }
635                }
636                REQUIRED INT32 b;
637            }
638            ";
639        let parquet_group_type = parse_message_type(message_type).unwrap();
640        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
641
642        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
643        assert_eq!(mask.mask.unwrap(), [true, true]);
644
645        let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
646        assert_eq!(mask.mask.unwrap(), [true, true]);
647
648        let mask =
649            ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
650        assert_eq!(mask.mask.unwrap(), [true, true]);
651
652        let mask = ProjectionMask::columns(&schema, ["b"]);
653        assert_eq!(mask.mask.unwrap(), [false, true]);
654
655        let message_type = "
656            message test_schema {
657                OPTIONAL INT32 a;
658                OPTIONAL INT32 b;
659                OPTIONAL INT32 c;
660                OPTIONAL INT32 d;
661                OPTIONAL INT32 e;
662            }
663            ";
664        let parquet_group_type = parse_message_type(message_type).unwrap();
665        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
666
667        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
668        assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
669
670        let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
671        assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
672
673        let message_type = "
674            message test_schema {
675                OPTIONAL INT32 a;
676                OPTIONAL INT32 b;
677                OPTIONAL INT32 a;
678                OPTIONAL INT32 d;
679                OPTIONAL INT32 e;
680            }
681            ";
682        let parquet_group_type = parse_message_type(message_type).unwrap();
683        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
684
685        let mask = ProjectionMask::columns(&schema, ["a", "e"]);
686        assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
687
688        let message_type = "
689            message test_schema {
690                OPTIONAL INT32 a;
691                OPTIONAL INT32 aa;
692            }
693            ";
694        let parquet_group_type = parse_message_type(message_type).unwrap();
695        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
696
697        let mask = ProjectionMask::columns(&schema, ["a"]);
698        assert_eq!(mask.mask.unwrap(), [true, false]);
699    }
700
701    #[test]
702    fn test_projection_mask_union() {
703        let mut mask1 = ProjectionMask {
704            mask: Some(vec![true, false, true]),
705        };
706        let mask2 = ProjectionMask {
707            mask: Some(vec![false, true, true]),
708        };
709        mask1.union(&mask2);
710        assert_eq!(mask1.mask, Some(vec![true, true, true]));
711
712        let mut mask1 = ProjectionMask { mask: None };
713        let mask2 = ProjectionMask {
714            mask: Some(vec![false, true, true]),
715        };
716        mask1.union(&mask2);
717        assert_eq!(mask1.mask, None);
718
719        let mut mask1 = ProjectionMask {
720            mask: Some(vec![true, false, true]),
721        };
722        let mask2 = ProjectionMask { mask: None };
723        mask1.union(&mask2);
724        assert_eq!(mask1.mask, None);
725
726        let mut mask1 = ProjectionMask { mask: None };
727        let mask2 = ProjectionMask { mask: None };
728        mask1.union(&mask2);
729        assert_eq!(mask1.mask, None);
730    }
731
732    #[test]
733    fn test_projection_mask_intersect() {
734        let mut mask1 = ProjectionMask {
735            mask: Some(vec![true, false, true]),
736        };
737        let mask2 = ProjectionMask {
738            mask: Some(vec![false, true, true]),
739        };
740        mask1.intersect(&mask2);
741        assert_eq!(mask1.mask, Some(vec![false, false, true]));
742
743        let mut mask1 = ProjectionMask { mask: None };
744        let mask2 = ProjectionMask {
745            mask: Some(vec![false, true, true]),
746        };
747        mask1.intersect(&mask2);
748        assert_eq!(mask1.mask, Some(vec![false, true, true]));
749
750        let mut mask1 = ProjectionMask {
751            mask: Some(vec![true, false, true]),
752        };
753        let mask2 = ProjectionMask { mask: None };
754        mask1.intersect(&mask2);
755        assert_eq!(mask1.mask, Some(vec![true, false, true]));
756
757        let mut mask1 = ProjectionMask { mask: None };
758        let mask2 = ProjectionMask { mask: None };
759        mask1.intersect(&mask2);
760        assert_eq!(mask1.mask, None);
761    }
762}