parquet/arrow/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! API for reading/writing Arrow [`RecordBatch`]es and [`Array`]s to/from
19//! Parquet Files.
20//!
21//! See the [crate-level documentation](crate) for more details on other APIs
22//!
23//! # Schema Conversion
24//!
25//! These APIs ensure that data in Arrow [`RecordBatch`]es written to Parquet are
26//! read back as [`RecordBatch`]es with the exact same types and values.
27//!
28//! Parquet and Arrow have different type systems, and there is not
29//! always a one to one mapping between the systems. For example, data
30//! stored as a Parquet [`BYTE_ARRAY`] can be read as either an Arrow
31//! [`BinaryViewArray`] or [`BinaryArray`].
32//!
33//! To recover the original Arrow types, the writers in this module add a "hint" to
34//! the metadata in the [`ARROW_SCHEMA_META_KEY`] key which records the original Arrow
35//! schema. The metadata hint follows the same convention as arrow-cpp based
36//! implementations such as `pyarrow`. The reader looks for the schema hint in the
37//! metadata to determine Arrow types, and if it is not present, infers the Arrow schema
38//! from the Parquet schema.
39//!
40//! In situations where the embedded Arrow schema is not compatible with the Parquet
41//! schema, the Parquet schema takes precedence and no error is raised.
42//! See [#1663](https://github.com/apache/arrow-rs/issues/1663)
43//!
44//! You can also control the type conversion process in more detail using:
45//!
46//! * [`ArrowSchemaConverter`] control the conversion of Arrow types to Parquet
47//! types.
48//!
49//! * [`ArrowReaderOptions::with_schema`] to explicitly specify your own Arrow schema hint
50//! to use when reading Parquet, overriding any metadata that may be present.
51//!
52//! [`RecordBatch`]: arrow_array::RecordBatch
53//! [`Array`]: arrow_array::Array
54//! [`BYTE_ARRAY`]: crate::basic::Type::BYTE_ARRAY
55//! [`BinaryViewArray`]: arrow_array::BinaryViewArray
56//! [`BinaryArray`]: arrow_array::BinaryArray
57//! [`ArrowReaderOptions::with_schema`]: arrow_reader::ArrowReaderOptions::with_schema
58//!
59//! # Example: Writing Arrow `RecordBatch` to Parquet file
60//!
61//!```rust
62//! # use arrow_array::{Int32Array, ArrayRef};
63//! # use arrow_array::RecordBatch;
64//! # use parquet::arrow::arrow_writer::ArrowWriter;
65//! # use parquet::file::properties::WriterProperties;
66//! # use tempfile::tempfile;
67//! # use std::sync::Arc;
68//! # use parquet::basic::Compression;
69//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
70//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
71//! let batch = RecordBatch::try_from_iter(vec![
72//! ("id", Arc::new(ids) as ArrayRef),
73//! ("val", Arc::new(vals) as ArrayRef),
74//! ]).unwrap();
75//!
76//! let file = tempfile().unwrap();
77//!
78//! // WriterProperties can be used to set Parquet file options
79//! let props = WriterProperties::builder()
80//! .set_compression(Compression::SNAPPY)
81//! .build();
82//!
83//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
84//!
85//! writer.write(&batch).expect("Writing batch");
86//!
87//! // writer must be closed to write footer
88//! writer.close().unwrap();
89//! ```
90//!
91//! # Example: Reading Parquet file into Arrow `RecordBatch`
92//!
93//! ```rust
94//! # use std::fs::File;
95//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
96//! # use std::sync::Arc;
97//! # use arrow_array::Int32Array;
98//! # use arrow::datatypes::{DataType, Field, Schema};
99//! # use arrow_array::RecordBatch;
100//! # use parquet::arrow::arrow_writer::ArrowWriter;
101//! #
102//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
103//! # let schema = Arc::new(Schema::new(vec![
104//! # Field::new("id", DataType::Int32, false),
105//! # ]));
106//! #
107//! # let file = File::create("data.parquet").unwrap();
108//! #
109//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
110//! # let batches = vec![batch];
111//! #
112//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
113//! #
114//! # for batch in batches {
115//! # writer.write(&batch).expect("Writing batch");
116//! # }
117//! # writer.close().unwrap();
118//! #
119//! let file = File::open("data.parquet").unwrap();
120//!
121//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
122//! println!("Converted arrow schema is: {}", builder.schema());
123//!
124//! let mut reader = builder.build().unwrap();
125//!
126//! let record_batch = reader.next().unwrap().unwrap();
127//!
128//! println!("Read {} records.", record_batch.num_rows());
129//! ```
130//!
131//! # Example: Reading non-uniformly encrypted parquet file into arrow record batch
132//!
133//! Note: This requires the experimental `encryption` feature to be enabled at compile time.
134//!
135#![cfg_attr(feature = "encryption", doc = "```rust")]
136#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
137//! # use arrow_array::{Int32Array, ArrayRef};
138//! # use arrow_array::{types, RecordBatch};
139//! # use parquet::arrow::arrow_reader::{
140//! # ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
141//! # };
142//! # use arrow_array::cast::AsArray;
143//! # use parquet::file::metadata::ParquetMetaData;
144//! # use tempfile::tempfile;
145//! # use std::fs::File;
146//! # use parquet::encryption::decrypt::FileDecryptionProperties;
147//! # let test_data = arrow::util::test_util::parquet_test_data();
148//! # let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
149//! #
150//! let file = File::open(path).unwrap();
151//!
152//! // Define the AES encryption keys required required for decrypting the footer metadata
153//! // and column-specific data. If only a footer key is used then it is assumed that the
154//! // file uses uniform encryption and all columns are encrypted with the footer key.
155//! // If any column keys are specified, other columns without a key provided are assumed
156//! // to be unencrypted
157//! let footer_key = "0123456789012345".as_bytes(); // Keys are 128 bits (16 bytes)
158//! let column_1_key = "1234567890123450".as_bytes();
159//! let column_2_key = "1234567890123451".as_bytes();
160//!
161//! let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
162//! .with_column_key("double_field", column_1_key.to_vec())
163//! .with_column_key("float_field", column_2_key.to_vec())
164//! .build()
165//! .unwrap();
166//!
167//! let options = ArrowReaderOptions::default()
168//! .with_file_decryption_properties(decryption_properties);
169//! let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
170//! let file_metadata = reader_metadata.metadata().file_metadata();
171//! assert_eq!(50, file_metadata.num_rows());
172//!
173//! let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
174//! .unwrap()
175//! .build()
176//! .unwrap();
177//!
178//! let record_batch = reader.next().unwrap().unwrap();
179//! assert_eq!(50, record_batch.num_rows());
180//! ```
181
182experimental!(mod array_reader);
183pub mod arrow_reader;
184pub mod arrow_writer;
185mod buffer;
186mod decoder;
187
188#[cfg(feature = "async")]
189pub mod async_reader;
190#[cfg(feature = "async")]
191pub mod async_writer;
192
193mod record_reader;
194experimental!(mod schema);
195
196use std::sync::Arc;
197
198pub use self::arrow_writer::ArrowWriter;
199#[cfg(feature = "async")]
200pub use self::async_reader::ParquetRecordBatchStreamBuilder;
201#[cfg(feature = "async")]
202pub use self::async_writer::AsyncArrowWriter;
203use crate::schema::types::{SchemaDescriptor, Type};
204use arrow_schema::{FieldRef, Schema};
205// continue to export deprecated methods until they are removed
206#[allow(deprecated)]
207pub use self::schema::arrow_to_parquet_schema;
208
209pub use self::schema::{
210 add_encoded_arrow_schema_to_metadata, encode_arrow_schema, parquet_to_arrow_field_levels,
211 parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, ArrowSchemaConverter, FieldLevels,
212};
213
214/// Schema metadata key used to store serialized Arrow schema
215///
216/// The Arrow schema is encoded using the Arrow IPC format, and then base64
217/// encoded. This is the same format used by arrow-cpp systems, such as pyarrow.
218pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
219
220/// The value of this metadata key, if present on [`Field::metadata`], will be used
221/// to populate [`BasicTypeInfo::id`]
222///
223/// [`Field::metadata`]: arrow_schema::Field::metadata
224/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
225pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
226
227/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
228///
229/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
230/// or root column indices where:
231///
232/// * Root columns are the direct children of the root schema, enumerated in order
233/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
234///
235/// For example, the schema
236///
237/// ```ignore
238/// message schema {
239/// REQUIRED boolean leaf_1;
240/// REQUIRED GROUP group {
241/// OPTIONAL int32 leaf_2;
242/// OPTIONAL int64 leaf_3;
243/// }
244/// }
245/// ```
246///
247/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
248///
249/// For non-nested schemas, i.e. those containing only primitive columns, the root
250/// and leaves are the same
251///
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct ProjectionMask {
254 /// If present a leaf column should be included if the value at
255 /// the corresponding index is true
256 ///
257 /// If `None`, include all columns
258 mask: Option<Vec<bool>>,
259}
260
261impl ProjectionMask {
262 /// Create a [`ProjectionMask`] which selects all columns
263 pub fn all() -> Self {
264 Self { mask: None }
265 }
266
267 /// Create a [`ProjectionMask`] which selects only the specified leaf columns
268 ///
269 /// Note: repeated or out of order indices will not impact the final mask
270 ///
271 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
272 pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
273 let mut mask = vec![false; schema.num_columns()];
274 for leaf_idx in indices {
275 mask[leaf_idx] = true;
276 }
277 Self { mask: Some(mask) }
278 }
279
280 /// Create a [`ProjectionMask`] which selects only the specified root columns
281 ///
282 /// Note: repeated or out of order indices will not impact the final mask
283 ///
284 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
285 pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
286 let num_root_columns = schema.root_schema().get_fields().len();
287 let mut root_mask = vec![false; num_root_columns];
288 for root_idx in indices {
289 root_mask[root_idx] = true;
290 }
291
292 let mask = (0..schema.num_columns())
293 .map(|leaf_idx| {
294 let root_idx = schema.get_column_root_idx(leaf_idx);
295 root_mask[root_idx]
296 })
297 .collect();
298
299 Self { mask: Some(mask) }
300 }
301
302 // Given a starting point in the schema, do a DFS for that node adding leaf paths to `paths`.
303 fn find_leaves(root: &Arc<Type>, parent: Option<&String>, paths: &mut Vec<String>) {
304 let path = parent
305 .map(|p| [p, root.name()].join("."))
306 .unwrap_or(root.name().to_string());
307 if root.is_group() {
308 for child in root.get_fields() {
309 Self::find_leaves(child, Some(&path), paths);
310 }
311 } else {
312 // Reached a leaf, add to paths
313 paths.push(path);
314 }
315 }
316
317 /// Create a [`ProjectionMask`] which selects only the named columns
318 ///
319 /// All leaf columns that fall below a given name will be selected. For example, given
320 /// the schema
321 /// ```ignore
322 /// message schema {
323 /// OPTIONAL group a (MAP) {
324 /// REPEATED group key_value {
325 /// REQUIRED BYTE_ARRAY key (UTF8); // leaf index 0
326 /// OPTIONAL group value (MAP) {
327 /// REPEATED group key_value {
328 /// REQUIRED INT32 key; // leaf index 1
329 /// REQUIRED BOOLEAN value; // leaf index 2
330 /// }
331 /// }
332 /// }
333 /// }
334 /// REQUIRED INT32 b; // leaf index 3
335 /// REQUIRED DOUBLE c; // leaf index 4
336 /// }
337 /// ```
338 /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return
339 /// columns 0, 1, and 2.
340 ///
341 /// Note: repeated or out of order indices will not impact the final mask.
342 ///
343 /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`.
344 pub fn columns<'a>(
345 schema: &SchemaDescriptor,
346 names: impl IntoIterator<Item = &'a str>,
347 ) -> Self {
348 // first make vector of paths for leaf columns
349 let mut paths: Vec<String> = vec![];
350 for root in schema.root_schema().get_fields() {
351 Self::find_leaves(root, None, &mut paths);
352 }
353 assert_eq!(paths.len(), schema.num_columns());
354
355 let mut mask = vec![false; schema.num_columns()];
356 for name in names {
357 for idx in 0..schema.num_columns() {
358 if paths[idx].starts_with(name) {
359 mask[idx] = true;
360 }
361 }
362 }
363
364 Self { mask: Some(mask) }
365 }
366
367 /// Returns true if the leaf column `leaf_idx` is included by the mask
368 pub fn leaf_included(&self, leaf_idx: usize) -> bool {
369 self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
370 }
371
372 /// Union two projection masks
373 ///
374 /// Example:
375 /// ```text
376 /// mask1 = [true, false, true]
377 /// mask2 = [false, true, true]
378 /// union(mask1, mask2) = [true, true, true]
379 /// ```
380 pub fn union(&mut self, other: &Self) {
381 match (self.mask.as_ref(), other.mask.as_ref()) {
382 (None, _) | (_, None) => self.mask = None,
383 (Some(a), Some(b)) => {
384 debug_assert_eq!(a.len(), b.len());
385 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
386 self.mask = Some(mask);
387 }
388 }
389 }
390
391 /// Intersect two projection masks
392 ///
393 /// Example:
394 /// ```text
395 /// mask1 = [true, false, true]
396 /// mask2 = [false, true, true]
397 /// intersect(mask1, mask2) = [false, false, true]
398 /// ```
399 pub fn intersect(&mut self, other: &Self) {
400 match (self.mask.as_ref(), other.mask.as_ref()) {
401 (None, _) => self.mask = other.mask.clone(),
402 (_, None) => {}
403 (Some(a), Some(b)) => {
404 debug_assert_eq!(a.len(), b.len());
405 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
406 self.mask = Some(mask);
407 }
408 }
409 }
410}
411
412/// Lookups up the parquet column by name
413///
414/// Returns the parquet column index and the corresponding arrow field
415pub fn parquet_column<'a>(
416 parquet_schema: &SchemaDescriptor,
417 arrow_schema: &'a Schema,
418 name: &str,
419) -> Option<(usize, &'a FieldRef)> {
420 let (root_idx, field) = arrow_schema.fields.find(name)?;
421 if field.data_type().is_nested() {
422 // Nested fields are not supported and require non-trivial logic
423 // to correctly walk the parquet schema accounting for the
424 // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
425 //
426 // For example a ListArray could correspond to anything from 1 to 3 levels
427 // in the parquet schema
428 return None;
429 }
430
431 // This could be made more efficient (#TBD)
432 let parquet_idx = (0..parquet_schema.columns().len())
433 .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
434 Some((parquet_idx, field))
435}
436
437#[cfg(test)]
438mod test {
439 use crate::arrow::ArrowWriter;
440 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
441 use crate::file::properties::{EnabledStatistics, WriterProperties};
442 use crate::schema::parser::parse_message_type;
443 use crate::schema::types::SchemaDescriptor;
444 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
445 use bytes::Bytes;
446 use std::sync::Arc;
447
448 use super::ProjectionMask;
449
450 #[test]
451 // Reproducer for https://github.com/apache/arrow-rs/issues/6464
452 fn test_metadata_read_write_partial_offset() {
453 let parquet_bytes = create_parquet_file();
454
455 // read the metadata from the file WITHOUT the page index structures
456 let original_metadata = ParquetMetaDataReader::new()
457 .parse_and_finish(&parquet_bytes)
458 .unwrap();
459
460 // this should error because the page indexes are not present, but have offsets specified
461 let metadata_bytes = metadata_to_bytes(&original_metadata);
462 let err = ParquetMetaDataReader::new()
463 .with_page_indexes(true) // there are no page indexes in the metadata
464 .parse_and_finish(&metadata_bytes)
465 .err()
466 .unwrap();
467 assert_eq!(
468 err.to_string(),
469 "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
470 );
471 }
472
473 #[test]
474 fn test_metadata_read_write_roundtrip() {
475 let parquet_bytes = create_parquet_file();
476
477 // read the metadata from the file
478 let original_metadata = ParquetMetaDataReader::new()
479 .parse_and_finish(&parquet_bytes)
480 .unwrap();
481
482 // read metadata back from the serialized bytes and ensure it is the same
483 let metadata_bytes = metadata_to_bytes(&original_metadata);
484 assert_ne!(
485 metadata_bytes.len(),
486 parquet_bytes.len(),
487 "metadata is subset of parquet"
488 );
489
490 let roundtrip_metadata = ParquetMetaDataReader::new()
491 .parse_and_finish(&metadata_bytes)
492 .unwrap();
493
494 assert_eq!(original_metadata, roundtrip_metadata);
495 }
496
497 #[test]
498 fn test_metadata_read_write_roundtrip_page_index() {
499 let parquet_bytes = create_parquet_file();
500
501 // read the metadata from the file including the page index structures
502 // (which are stored elsewhere in the footer)
503 let original_metadata = ParquetMetaDataReader::new()
504 .with_page_indexes(true)
505 .parse_and_finish(&parquet_bytes)
506 .unwrap();
507
508 // read metadata back from the serialized bytes and ensure it is the same
509 let metadata_bytes = metadata_to_bytes(&original_metadata);
510 let roundtrip_metadata = ParquetMetaDataReader::new()
511 .with_page_indexes(true)
512 .parse_and_finish(&metadata_bytes)
513 .unwrap();
514
515 // Need to normalize the metadata first to remove offsets in data
516 let original_metadata = normalize_locations(original_metadata);
517 let roundtrip_metadata = normalize_locations(roundtrip_metadata);
518 assert_eq!(
519 format!("{original_metadata:#?}"),
520 format!("{roundtrip_metadata:#?}")
521 );
522 assert_eq!(original_metadata, roundtrip_metadata);
523 }
524
525 /// Sets the page index offset locations in the metadata to `None`
526 ///
527 /// This is because the offsets are used to find the relative location of the index
528 /// structures, and thus differ depending on how the structures are stored.
529 fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
530 let mut metadata_builder = metadata.into_builder();
531 for rg in metadata_builder.take_row_groups() {
532 let mut rg_builder = rg.into_builder();
533 for col in rg_builder.take_columns() {
534 rg_builder = rg_builder.add_column_metadata(
535 col.into_builder()
536 .set_offset_index_offset(None)
537 .set_index_page_offset(None)
538 .set_column_index_offset(None)
539 .build()
540 .unwrap(),
541 );
542 }
543 let rg = rg_builder.build().unwrap();
544 metadata_builder = metadata_builder.add_row_group(rg);
545 }
546 metadata_builder.build()
547 }
548
549 /// Write a parquet filed into an in memory buffer
550 fn create_parquet_file() -> Bytes {
551 let mut buf = vec![];
552 let data = vec![100, 200, 201, 300, 102, 33];
553 let array: ArrayRef = Arc::new(Int32Array::from(data));
554 let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
555 let props = WriterProperties::builder()
556 .set_statistics_enabled(EnabledStatistics::Page)
557 .build();
558
559 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
560 writer.write(&batch).unwrap();
561 writer.finish().unwrap();
562 drop(writer);
563
564 Bytes::from(buf)
565 }
566
567 /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
568 fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
569 let mut buf = vec![];
570 ParquetMetaDataWriter::new(&mut buf, metadata)
571 .finish()
572 .unwrap();
573 Bytes::from(buf)
574 }
575
576 #[test]
577 fn test_mask_from_column_names() {
578 let message_type = "
579 message test_schema {
580 OPTIONAL group a (MAP) {
581 REPEATED group key_value {
582 REQUIRED BYTE_ARRAY key (UTF8);
583 OPTIONAL group value (MAP) {
584 REPEATED group key_value {
585 REQUIRED INT32 key;
586 REQUIRED BOOLEAN value;
587 }
588 }
589 }
590 }
591 REQUIRED INT32 b;
592 REQUIRED DOUBLE c;
593 }
594 ";
595 let parquet_group_type = parse_message_type(message_type).unwrap();
596 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
597
598 let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
599 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
600
601 let mask = ProjectionMask::columns(&schema, []);
602 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
603
604 let mask = ProjectionMask::columns(&schema, ["a", "c"]);
605 assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
606
607 let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
608 assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
609
610 let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
611 assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
612
613 let message_type = "
614 message test_schema {
615 OPTIONAL group a (LIST) {
616 REPEATED group list {
617 OPTIONAL group element (LIST) {
618 REPEATED group list {
619 OPTIONAL group element (LIST) {
620 REPEATED group list {
621 OPTIONAL BYTE_ARRAY element (UTF8);
622 }
623 }
624 }
625 }
626 }
627 }
628 REQUIRED INT32 b;
629 }
630 ";
631 let parquet_group_type = parse_message_type(message_type).unwrap();
632 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
633
634 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
635 assert_eq!(mask.mask.unwrap(), [true, true]);
636
637 let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
638 assert_eq!(mask.mask.unwrap(), [true, true]);
639
640 let mask =
641 ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
642 assert_eq!(mask.mask.unwrap(), [true, true]);
643
644 let mask = ProjectionMask::columns(&schema, ["b"]);
645 assert_eq!(mask.mask.unwrap(), [false, true]);
646
647 let message_type = "
648 message test_schema {
649 OPTIONAL INT32 a;
650 OPTIONAL INT32 b;
651 OPTIONAL INT32 c;
652 OPTIONAL INT32 d;
653 OPTIONAL INT32 e;
654 }
655 ";
656 let parquet_group_type = parse_message_type(message_type).unwrap();
657 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
658
659 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
660 assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
661
662 let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
663 assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
664
665 let message_type = "
666 message test_schema {
667 OPTIONAL INT32 a;
668 OPTIONAL INT32 b;
669 OPTIONAL INT32 a;
670 OPTIONAL INT32 d;
671 OPTIONAL INT32 e;
672 }
673 ";
674 let parquet_group_type = parse_message_type(message_type).unwrap();
675 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
676
677 let mask = ProjectionMask::columns(&schema, ["a", "e"]);
678 assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
679 }
680
681 #[test]
682 fn test_projection_mask_union() {
683 let mut mask1 = ProjectionMask {
684 mask: Some(vec![true, false, true]),
685 };
686 let mask2 = ProjectionMask {
687 mask: Some(vec![false, true, true]),
688 };
689 mask1.union(&mask2);
690 assert_eq!(mask1.mask, Some(vec![true, true, true]));
691
692 let mut mask1 = ProjectionMask { mask: None };
693 let mask2 = ProjectionMask {
694 mask: Some(vec![false, true, true]),
695 };
696 mask1.union(&mask2);
697 assert_eq!(mask1.mask, None);
698
699 let mut mask1 = ProjectionMask {
700 mask: Some(vec![true, false, true]),
701 };
702 let mask2 = ProjectionMask { mask: None };
703 mask1.union(&mask2);
704 assert_eq!(mask1.mask, None);
705
706 let mut mask1 = ProjectionMask { mask: None };
707 let mask2 = ProjectionMask { mask: None };
708 mask1.union(&mask2);
709 assert_eq!(mask1.mask, None);
710 }
711
712 #[test]
713 fn test_projection_mask_intersect() {
714 let mut mask1 = ProjectionMask {
715 mask: Some(vec![true, false, true]),
716 };
717 let mask2 = ProjectionMask {
718 mask: Some(vec![false, true, true]),
719 };
720 mask1.intersect(&mask2);
721 assert_eq!(mask1.mask, Some(vec![false, false, true]));
722
723 let mut mask1 = ProjectionMask { mask: None };
724 let mask2 = ProjectionMask {
725 mask: Some(vec![false, true, true]),
726 };
727 mask1.intersect(&mask2);
728 assert_eq!(mask1.mask, Some(vec![false, true, true]));
729
730 let mut mask1 = ProjectionMask {
731 mask: Some(vec![true, false, true]),
732 };
733 let mask2 = ProjectionMask { mask: None };
734 mask1.intersect(&mask2);
735 assert_eq!(mask1.mask, Some(vec![true, false, true]));
736
737 let mut mask1 = ProjectionMask { mask: None };
738 let mask2 = ProjectionMask { mask: None };
739 mask1.intersect(&mask2);
740 assert_eq!(mask1.mask, None);
741 }
742}