parquet/arrow/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! API for reading/writing
19//! Arrow [RecordBatch](arrow_array::RecordBatch)es and
20//! [Array](arrow_array::Array)s to/from Parquet Files.
21//!
22//! See the [crate-level documentation](crate) for more details.
23//!
24//! # Example of writing Arrow record batch to Parquet file
25//!
26//!```rust
27//! # use arrow_array::{Int32Array, ArrayRef};
28//! # use arrow_array::RecordBatch;
29//! # use parquet::arrow::arrow_writer::ArrowWriter;
30//! # use parquet::file::properties::WriterProperties;
31//! # use tempfile::tempfile;
32//! # use std::sync::Arc;
33//! # use parquet::basic::Compression;
34//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
35//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
36//! let batch = RecordBatch::try_from_iter(vec![
37//!   ("id", Arc::new(ids) as ArrayRef),
38//!   ("val", Arc::new(vals) as ArrayRef),
39//! ]).unwrap();
40//!
41//! let file = tempfile().unwrap();
42//!
43//! // WriterProperties can be used to set Parquet file options
44//! let props = WriterProperties::builder()
45//!     .set_compression(Compression::SNAPPY)
46//!     .build();
47//!
48//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
49//!
50//! writer.write(&batch).expect("Writing batch");
51//!
52//! // writer must be closed to write footer
53//! writer.close().unwrap();
54//! ```
55//!
56//! # Example of reading parquet file into arrow record batch
57//!
58//! ```rust
59//! # use std::fs::File;
60//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
61//! # use std::sync::Arc;
62//! # use arrow_array::Int32Array;
63//! # use arrow::datatypes::{DataType, Field, Schema};
64//! # use arrow_array::RecordBatch;
65//! # use parquet::arrow::arrow_writer::ArrowWriter;
66//! #
67//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
68//! # let schema = Arc::new(Schema::new(vec![
69//! #     Field::new("id", DataType::Int32, false),
70//! # ]));
71//! #
72//! # let file = File::create("data.parquet").unwrap();
73//! #
74//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
75//! # let batches = vec![batch];
76//! #
77//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
78//! #
79//! # for batch in batches {
80//! #     writer.write(&batch).expect("Writing batch");
81//! # }
82//! # writer.close().unwrap();
83//! #
84//! let file = File::open("data.parquet").unwrap();
85//!
86//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
87//! println!("Converted arrow schema is: {}", builder.schema());
88//!
89//! let mut reader = builder.build().unwrap();
90//!
91//! let record_batch = reader.next().unwrap().unwrap();
92//!
93//! println!("Read {} records.", record_batch.num_rows());
94//! ```
95//!
96//! # Example of reading non-uniformly encrypted parquet file into arrow record batch
97//!
98//! Note: This requires the experimental `encryption` feature to be enabled at compile time.
99//!
100//!
101#![cfg_attr(feature = "encryption", doc = "```rust")]
102#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
103//! # use arrow_array::{Int32Array, ArrayRef};
104//! # use arrow_array::{types, RecordBatch};
105//! # use parquet::arrow::arrow_reader::{
106//! #     ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
107//! # };
108//! # use arrow_array::cast::AsArray;
109//! # use parquet::file::metadata::ParquetMetaData;
110//! # use tempfile::tempfile;
111//! # use std::fs::File;
112//! # use parquet::encryption::decrypt::FileDecryptionProperties;
113//! # let test_data = arrow::util::test_util::parquet_test_data();
114//! # let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
115//! #
116//! let file = File::open(path).unwrap();
117//!
118//! // Define the AES encryption keys required required for decrypting the footer metadata
119//! // and column-specific data. If only a footer key is used then it is assumed that the
120//! // file uses uniform encryption and all columns are encrypted with the footer key.
121//! // If any column keys are specified, other columns without a key provided are assumed
122//! // to be unencrypted
123//! let footer_key = "0123456789012345".as_bytes(); // Keys are 128 bits (16 bytes)
124//! let column_1_key = "1234567890123450".as_bytes();
125//! let column_2_key = "1234567890123451".as_bytes();
126//!
127//! let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
128//!     .with_column_key("double_field", column_1_key.to_vec())
129//!     .with_column_key("float_field", column_2_key.to_vec())
130//!     .build()
131//!     .unwrap();
132//!
133//! let options = ArrowReaderOptions::default()
134//!  .with_file_decryption_properties(decryption_properties);
135//! let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
136//! let file_metadata = reader_metadata.metadata().file_metadata();
137//! assert_eq!(50, file_metadata.num_rows());
138//!
139//! let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
140//!   .unwrap()
141//!   .build()
142//!   .unwrap();
143//!
144//! let record_batch = reader.next().unwrap().unwrap();
145//! assert_eq!(50, record_batch.num_rows());
146//! ```
147
148experimental!(mod array_reader);
149pub mod arrow_reader;
150pub mod arrow_writer;
151mod buffer;
152mod decoder;
153
154#[cfg(feature = "async")]
155pub mod async_reader;
156#[cfg(feature = "async")]
157pub mod async_writer;
158
159mod record_reader;
160experimental!(mod schema);
161
162use std::sync::Arc;
163
164pub use self::arrow_writer::ArrowWriter;
165#[cfg(feature = "async")]
166pub use self::async_reader::ParquetRecordBatchStreamBuilder;
167#[cfg(feature = "async")]
168pub use self::async_writer::AsyncArrowWriter;
169use crate::schema::types::{SchemaDescriptor, Type};
170use arrow_schema::{FieldRef, Schema};
171
172// continue to export deprecated methods until they are removed
173#[allow(deprecated)]
174pub use self::schema::arrow_to_parquet_schema;
175
176pub use self::schema::{
177    add_encoded_arrow_schema_to_metadata, encode_arrow_schema, parquet_to_arrow_field_levels,
178    parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, ArrowSchemaConverter, FieldLevels,
179};
180
181/// Schema metadata key used to store serialized Arrow IPC schema
182pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
183
184/// The value of this metadata key, if present on [`Field::metadata`], will be used
185/// to populate [`BasicTypeInfo::id`]
186///
187/// [`Field::metadata`]: arrow_schema::Field::metadata
188/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
189pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
190
191/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
192///
193/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
194/// or root column indices where:
195///
196/// * Root columns are the direct children of the root schema, enumerated in order
197/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
198///
199/// For example, the schema
200///
201/// ```ignore
202/// message schema {
203///   REQUIRED boolean         leaf_1;
204///   REQUIRED GROUP group {
205///     OPTIONAL int32 leaf_2;
206///     OPTIONAL int64 leaf_3;
207///   }
208/// }
209/// ```
210///
211/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
212///
213/// For non-nested schemas, i.e. those containing only primitive columns, the root
214/// and leaves are the same
215///
216#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct ProjectionMask {
218    /// If present a leaf column should be included if the value at
219    /// the corresponding index is true
220    ///
221    /// If `None`, include all columns
222    mask: Option<Vec<bool>>,
223}
224
225impl ProjectionMask {
226    /// Create a [`ProjectionMask`] which selects all columns
227    pub fn all() -> Self {
228        Self { mask: None }
229    }
230
231    /// Create a [`ProjectionMask`] which selects only the specified leaf columns
232    ///
233    /// Note: repeated or out of order indices will not impact the final mask
234    ///
235    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
236    pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
237        let mut mask = vec![false; schema.num_columns()];
238        for leaf_idx in indices {
239            mask[leaf_idx] = true;
240        }
241        Self { mask: Some(mask) }
242    }
243
244    /// Create a [`ProjectionMask`] which selects only the specified root columns
245    ///
246    /// Note: repeated or out of order indices will not impact the final mask
247    ///
248    /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
249    pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
250        let num_root_columns = schema.root_schema().get_fields().len();
251        let mut root_mask = vec![false; num_root_columns];
252        for root_idx in indices {
253            root_mask[root_idx] = true;
254        }
255
256        let mask = (0..schema.num_columns())
257            .map(|leaf_idx| {
258                let root_idx = schema.get_column_root_idx(leaf_idx);
259                root_mask[root_idx]
260            })
261            .collect();
262
263        Self { mask: Some(mask) }
264    }
265
266    // Given a starting point in the schema, do a DFS for that node adding leaf paths to `paths`.
267    fn find_leaves(root: &Arc<Type>, parent: Option<&String>, paths: &mut Vec<String>) {
268        let path = parent
269            .map(|p| [p, root.name()].join("."))
270            .unwrap_or(root.name().to_string());
271        if root.is_group() {
272            for child in root.get_fields() {
273                Self::find_leaves(child, Some(&path), paths);
274            }
275        } else {
276            // Reached a leaf, add to paths
277            paths.push(path);
278        }
279    }
280
281    /// Create a [`ProjectionMask`] which selects only the named columns
282    ///
283    /// All leaf columns that fall below a given name will be selected. For example, given
284    /// the schema
285    /// ```ignore
286    /// message schema {
287    ///   OPTIONAL group a (MAP) {
288    ///     REPEATED group key_value {
289    ///       REQUIRED BYTE_ARRAY key (UTF8);  // leaf index 0
290    ///       OPTIONAL group value (MAP) {
291    ///         REPEATED group key_value {
292    ///           REQUIRED INT32 key;          // leaf index 1
293    ///           REQUIRED BOOLEAN value;      // leaf index 2
294    ///         }
295    ///       }
296    ///     }
297    ///   }
298    ///   REQUIRED INT32 b;                    // leaf index 3
299    ///   REQUIRED DOUBLE c;                   // leaf index 4
300    /// }
301    /// ```
302    /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return
303    /// columns 0, 1, and 2.
304    ///
305    /// Note: repeated or out of order indices will not impact the final mask.
306    ///
307    /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`.
308    pub fn columns<'a>(
309        schema: &SchemaDescriptor,
310        names: impl IntoIterator<Item = &'a str>,
311    ) -> Self {
312        // first make vector of paths for leaf columns
313        let mut paths: Vec<String> = vec![];
314        for root in schema.root_schema().get_fields() {
315            Self::find_leaves(root, None, &mut paths);
316        }
317        assert_eq!(paths.len(), schema.num_columns());
318
319        let mut mask = vec![false; schema.num_columns()];
320        for name in names {
321            for idx in 0..schema.num_columns() {
322                if paths[idx].starts_with(name) {
323                    mask[idx] = true;
324                }
325            }
326        }
327
328        Self { mask: Some(mask) }
329    }
330
331    /// Returns true if the leaf column `leaf_idx` is included by the mask
332    pub fn leaf_included(&self, leaf_idx: usize) -> bool {
333        self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
334    }
335
336    /// Union two projection masks
337    ///
338    /// Example:
339    /// ```text
340    /// mask1 = [true, false, true]
341    /// mask2 = [false, true, true]
342    /// union(mask1, mask2) = [true, true, true]
343    /// ```
344    pub fn union(&mut self, other: &Self) {
345        match (self.mask.as_ref(), other.mask.as_ref()) {
346            (None, _) | (_, None) => self.mask = None,
347            (Some(a), Some(b)) => {
348                debug_assert_eq!(a.len(), b.len());
349                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
350                self.mask = Some(mask);
351            }
352        }
353    }
354
355    /// Intersect two projection masks
356    ///
357    /// Example:
358    /// ```text
359    /// mask1 = [true, false, true]
360    /// mask2 = [false, true, true]
361    /// intersect(mask1, mask2) = [false, false, true]
362    /// ```
363    pub fn intersect(&mut self, other: &Self) {
364        match (self.mask.as_ref(), other.mask.as_ref()) {
365            (None, _) => self.mask = other.mask.clone(),
366            (_, None) => {}
367            (Some(a), Some(b)) => {
368                debug_assert_eq!(a.len(), b.len());
369                let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
370                self.mask = Some(mask);
371            }
372        }
373    }
374}
375
376/// Lookups up the parquet column by name
377///
378/// Returns the parquet column index and the corresponding arrow field
379pub fn parquet_column<'a>(
380    parquet_schema: &SchemaDescriptor,
381    arrow_schema: &'a Schema,
382    name: &str,
383) -> Option<(usize, &'a FieldRef)> {
384    let (root_idx, field) = arrow_schema.fields.find(name)?;
385    if field.data_type().is_nested() {
386        // Nested fields are not supported and require non-trivial logic
387        // to correctly walk the parquet schema accounting for the
388        // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
389        //
390        // For example a ListArray could correspond to anything from 1 to 3 levels
391        // in the parquet schema
392        return None;
393    }
394
395    // This could be made more efficient (#TBD)
396    let parquet_idx = (0..parquet_schema.columns().len())
397        .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
398    Some((parquet_idx, field))
399}
400
401#[cfg(test)]
402mod test {
403    use crate::arrow::ArrowWriter;
404    use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
405    use crate::file::properties::{EnabledStatistics, WriterProperties};
406    use crate::schema::parser::parse_message_type;
407    use crate::schema::types::SchemaDescriptor;
408    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
409    use bytes::Bytes;
410    use std::sync::Arc;
411
412    use super::ProjectionMask;
413
414    #[test]
415    // Reproducer for https://github.com/apache/arrow-rs/issues/6464
416    fn test_metadata_read_write_partial_offset() {
417        let parquet_bytes = create_parquet_file();
418
419        // read the metadata from the file WITHOUT the page index structures
420        let original_metadata = ParquetMetaDataReader::new()
421            .parse_and_finish(&parquet_bytes)
422            .unwrap();
423
424        // this should error because the page indexes are not present, but have offsets specified
425        let metadata_bytes = metadata_to_bytes(&original_metadata);
426        let err = ParquetMetaDataReader::new()
427            .with_page_indexes(true) // there are no page indexes in the metadata
428            .parse_and_finish(&metadata_bytes)
429            .err()
430            .unwrap();
431        assert_eq!(
432            err.to_string(),
433            "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
434        );
435    }
436
437    #[test]
438    fn test_metadata_read_write_roundtrip() {
439        let parquet_bytes = create_parquet_file();
440
441        // read the metadata from the file
442        let original_metadata = ParquetMetaDataReader::new()
443            .parse_and_finish(&parquet_bytes)
444            .unwrap();
445
446        // read metadata back from the serialized bytes and ensure it is the same
447        let metadata_bytes = metadata_to_bytes(&original_metadata);
448        assert_ne!(
449            metadata_bytes.len(),
450            parquet_bytes.len(),
451            "metadata is subset of parquet"
452        );
453
454        let roundtrip_metadata = ParquetMetaDataReader::new()
455            .parse_and_finish(&metadata_bytes)
456            .unwrap();
457
458        assert_eq!(original_metadata, roundtrip_metadata);
459    }
460
461    #[test]
462    fn test_metadata_read_write_roundtrip_page_index() {
463        let parquet_bytes = create_parquet_file();
464
465        // read the metadata from the file including the page index structures
466        // (which are stored elsewhere in the footer)
467        let original_metadata = ParquetMetaDataReader::new()
468            .with_page_indexes(true)
469            .parse_and_finish(&parquet_bytes)
470            .unwrap();
471
472        // read metadata back from the serialized bytes and ensure it is the same
473        let metadata_bytes = metadata_to_bytes(&original_metadata);
474        let roundtrip_metadata = ParquetMetaDataReader::new()
475            .with_page_indexes(true)
476            .parse_and_finish(&metadata_bytes)
477            .unwrap();
478
479        // Need to normalize the metadata first to remove offsets in data
480        let original_metadata = normalize_locations(original_metadata);
481        let roundtrip_metadata = normalize_locations(roundtrip_metadata);
482        assert_eq!(
483            format!("{original_metadata:#?}"),
484            format!("{roundtrip_metadata:#?}")
485        );
486        assert_eq!(original_metadata, roundtrip_metadata);
487    }
488
489    /// Sets the page index offset locations in the metadata to `None`
490    ///
491    /// This is because the offsets are used to find the relative location of the index
492    /// structures, and thus differ depending on how the structures are stored.
493    fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
494        let mut metadata_builder = metadata.into_builder();
495        for rg in metadata_builder.take_row_groups() {
496            let mut rg_builder = rg.into_builder();
497            for col in rg_builder.take_columns() {
498                rg_builder = rg_builder.add_column_metadata(
499                    col.into_builder()
500                        .set_offset_index_offset(None)
501                        .set_index_page_offset(None)
502                        .set_column_index_offset(None)
503                        .build()
504                        .unwrap(),
505                );
506            }
507            let rg = rg_builder.build().unwrap();
508            metadata_builder = metadata_builder.add_row_group(rg);
509        }
510        metadata_builder.build()
511    }
512
513    /// Write a parquet filed into an in memory buffer
514    fn create_parquet_file() -> Bytes {
515        let mut buf = vec![];
516        let data = vec![100, 200, 201, 300, 102, 33];
517        let array: ArrayRef = Arc::new(Int32Array::from(data));
518        let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
519        let props = WriterProperties::builder()
520            .set_statistics_enabled(EnabledStatistics::Page)
521            .build();
522
523        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
524        writer.write(&batch).unwrap();
525        writer.finish().unwrap();
526        drop(writer);
527
528        Bytes::from(buf)
529    }
530
531    /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
532    fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
533        let mut buf = vec![];
534        ParquetMetaDataWriter::new(&mut buf, metadata)
535            .finish()
536            .unwrap();
537        Bytes::from(buf)
538    }
539
540    #[test]
541    fn test_mask_from_column_names() {
542        let message_type = "
543            message test_schema {
544                OPTIONAL group a (MAP) {
545                    REPEATED group key_value {
546                        REQUIRED BYTE_ARRAY key (UTF8);
547                        OPTIONAL group value (MAP) {
548                            REPEATED group key_value {
549                                REQUIRED INT32 key;
550                                REQUIRED BOOLEAN value;
551                            }
552                        }
553                    }
554                }
555                REQUIRED INT32 b;
556                REQUIRED DOUBLE c;
557            }
558            ";
559        let parquet_group_type = parse_message_type(message_type).unwrap();
560        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
561
562        let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
563        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
564
565        let mask = ProjectionMask::columns(&schema, []);
566        assert_eq!(mask.mask.unwrap(), vec![false; 5]);
567
568        let mask = ProjectionMask::columns(&schema, ["a", "c"]);
569        assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
570
571        let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
572        assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
573
574        let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
575        assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
576
577        let message_type = "
578            message test_schema {
579                OPTIONAL group a (LIST) {
580                    REPEATED group list {
581                        OPTIONAL group element (LIST) {
582                            REPEATED group list {
583                                OPTIONAL group element (LIST) {
584                                    REPEATED group list {
585                                        OPTIONAL BYTE_ARRAY element (UTF8);
586                                    }
587                                }
588                            }
589                        }
590                    }
591                }
592                REQUIRED INT32 b;
593            }
594            ";
595        let parquet_group_type = parse_message_type(message_type).unwrap();
596        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
597
598        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
599        assert_eq!(mask.mask.unwrap(), [true, true]);
600
601        let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
602        assert_eq!(mask.mask.unwrap(), [true, true]);
603
604        let mask =
605            ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
606        assert_eq!(mask.mask.unwrap(), [true, true]);
607
608        let mask = ProjectionMask::columns(&schema, ["b"]);
609        assert_eq!(mask.mask.unwrap(), [false, true]);
610
611        let message_type = "
612            message test_schema {
613                OPTIONAL INT32 a;
614                OPTIONAL INT32 b;
615                OPTIONAL INT32 c;
616                OPTIONAL INT32 d;
617                OPTIONAL INT32 e;
618            }
619            ";
620        let parquet_group_type = parse_message_type(message_type).unwrap();
621        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
622
623        let mask = ProjectionMask::columns(&schema, ["a", "b"]);
624        assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
625
626        let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
627        assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
628
629        let message_type = "
630            message test_schema {
631                OPTIONAL INT32 a;
632                OPTIONAL INT32 b;
633                OPTIONAL INT32 a;
634                OPTIONAL INT32 d;
635                OPTIONAL INT32 e;
636            }
637            ";
638        let parquet_group_type = parse_message_type(message_type).unwrap();
639        let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
640
641        let mask = ProjectionMask::columns(&schema, ["a", "e"]);
642        assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
643    }
644
645    #[test]
646    fn test_projection_mask_union() {
647        let mut mask1 = ProjectionMask {
648            mask: Some(vec![true, false, true]),
649        };
650        let mask2 = ProjectionMask {
651            mask: Some(vec![false, true, true]),
652        };
653        mask1.union(&mask2);
654        assert_eq!(mask1.mask, Some(vec![true, true, true]));
655
656        let mut mask1 = ProjectionMask { mask: None };
657        let mask2 = ProjectionMask {
658            mask: Some(vec![false, true, true]),
659        };
660        mask1.union(&mask2);
661        assert_eq!(mask1.mask, None);
662
663        let mut mask1 = ProjectionMask {
664            mask: Some(vec![true, false, true]),
665        };
666        let mask2 = ProjectionMask { mask: None };
667        mask1.union(&mask2);
668        assert_eq!(mask1.mask, None);
669
670        let mut mask1 = ProjectionMask { mask: None };
671        let mask2 = ProjectionMask { mask: None };
672        mask1.union(&mask2);
673        assert_eq!(mask1.mask, None);
674    }
675
676    #[test]
677    fn test_projection_mask_intersect() {
678        let mut mask1 = ProjectionMask {
679            mask: Some(vec![true, false, true]),
680        };
681        let mask2 = ProjectionMask {
682            mask: Some(vec![false, true, true]),
683        };
684        mask1.intersect(&mask2);
685        assert_eq!(mask1.mask, Some(vec![false, false, true]));
686
687        let mut mask1 = ProjectionMask { mask: None };
688        let mask2 = ProjectionMask {
689            mask: Some(vec![false, true, true]),
690        };
691        mask1.intersect(&mask2);
692        assert_eq!(mask1.mask, Some(vec![false, true, true]));
693
694        let mut mask1 = ProjectionMask {
695            mask: Some(vec![true, false, true]),
696        };
697        let mask2 = ProjectionMask { mask: None };
698        mask1.intersect(&mask2);
699        assert_eq!(mask1.mask, Some(vec![true, false, true]));
700
701        let mut mask1 = ProjectionMask { mask: None };
702        let mask2 = ProjectionMask { mask: None };
703        mask1.intersect(&mask2);
704        assert_eq!(mask1.mask, None);
705    }
706}