parquet/arrow/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! API for reading/writing Arrow [`RecordBatch`]es and [`Array`]s to/from
19//! Parquet Files.
20//!
21//! See the [crate-level documentation](crate) for more details on other APIs
22//!
23//! # Schema Conversion
24//!
25//! These APIs ensure that data in Arrow [`RecordBatch`]es written to Parquet are
26//! read back as [`RecordBatch`]es with the exact same types and values.
27//!
28//! Parquet and Arrow have different type systems, and there is not
29//! always a one to one mapping between the systems. For example, data
30//! stored as a Parquet [`BYTE_ARRAY`] can be read as either an Arrow
31//! [`BinaryViewArray`] or [`BinaryArray`].
32//!
33//! To recover the original Arrow types, the writers in this module add a "hint" to
34//! the metadata in the [`ARROW_SCHEMA_META_KEY`] key which records the original Arrow
35//! schema. The metadata hint follows the same convention as arrow-cpp based
36//! implementations such as `pyarrow`. The reader looks for the schema hint in the
37//! metadata to determine Arrow types, and if it is not present, infers the Arrow schema
38//! from the Parquet schema.
39//!
40//! In situations where the embedded Arrow schema is not compatible with the Parquet
41//! schema, the Parquet schema takes precedence and no error is raised.
42//! See [#1663](https://github.com/apache/arrow-rs/issues/1663)
43//!
44//! You can also control the type conversion process in more detail using:
45//!
46//! * [`ArrowSchemaConverter`] control the conversion of Arrow types to Parquet
47//! types.
48//!
49//! * [`ArrowReaderOptions::with_schema`] to explicitly specify your own Arrow schema hint
50//! to use when reading Parquet, overriding any metadata that may be present.
51//!
52//! [`RecordBatch`]: arrow_array::RecordBatch
53//! [`Array`]: arrow_array::Array
54//! [`BYTE_ARRAY`]: crate::basic::Type::BYTE_ARRAY
55//! [`BinaryViewArray`]: arrow_array::BinaryViewArray
56//! [`BinaryArray`]: arrow_array::BinaryArray
57//! [`ArrowReaderOptions::with_schema`]: arrow_reader::ArrowReaderOptions::with_schema
58//!
59//! # Example: Writing Arrow `RecordBatch` to Parquet file
60//!
61//!```rust
62//! # use arrow_array::{Int32Array, ArrayRef};
63//! # use arrow_array::RecordBatch;
64//! # use parquet::arrow::arrow_writer::ArrowWriter;
65//! # use parquet::file::properties::WriterProperties;
66//! # use tempfile::tempfile;
67//! # use std::sync::Arc;
68//! # use parquet::basic::Compression;
69//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
70//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
71//! let batch = RecordBatch::try_from_iter(vec![
72//! ("id", Arc::new(ids) as ArrayRef),
73//! ("val", Arc::new(vals) as ArrayRef),
74//! ]).unwrap();
75//!
76//! let file = tempfile().unwrap();
77//!
78//! // WriterProperties can be used to set Parquet file options
79//! let props = WriterProperties::builder()
80//! .set_compression(Compression::SNAPPY)
81//! .build();
82//!
83//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
84//!
85//! writer.write(&batch).expect("Writing batch");
86//!
87//! // writer must be closed to write footer
88//! writer.close().unwrap();
89//! ```
90//!
91//! # Example: Reading Parquet file into Arrow `RecordBatch`
92//!
93//! ```rust
94//! # use std::fs::File;
95//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
96//! # use std::sync::Arc;
97//! # use arrow_array::Int32Array;
98//! # use arrow::datatypes::{DataType, Field, Schema};
99//! # use arrow_array::RecordBatch;
100//! # use parquet::arrow::arrow_writer::ArrowWriter;
101//! #
102//! # let ids = Int32Array::from(vec![1, 2, 3, 4]);
103//! # let schema = Arc::new(Schema::new(vec![
104//! # Field::new("id", DataType::Int32, false),
105//! # ]));
106//! #
107//! # let file = File::create("data.parquet").unwrap();
108//! #
109//! # let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids)]).unwrap();
110//! # let batches = vec![batch];
111//! #
112//! # let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
113//! #
114//! # for batch in batches {
115//! # writer.write(&batch).expect("Writing batch");
116//! # }
117//! # writer.close().unwrap();
118//! #
119//! let file = File::open("data.parquet").unwrap();
120//!
121//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
122//! println!("Converted arrow schema is: {}", builder.schema());
123//!
124//! let mut reader = builder.build().unwrap();
125//!
126//! let record_batch = reader.next().unwrap().unwrap();
127//!
128//! println!("Read {} records.", record_batch.num_rows());
129//! ```
130//!
131//! # Example: Reading non-uniformly encrypted parquet file into arrow record batch
132//!
133//! Note: This requires the experimental `encryption` feature to be enabled at compile time.
134//!
135#![cfg_attr(feature = "encryption", doc = "```rust")]
136#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
137//! # use arrow_array::{Int32Array, ArrayRef};
138//! # use arrow_array::{types, RecordBatch};
139//! # use parquet::arrow::arrow_reader::{
140//! # ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
141//! # };
142//! # use arrow_array::cast::AsArray;
143//! # use parquet::file::metadata::ParquetMetaData;
144//! # use tempfile::tempfile;
145//! # use std::fs::File;
146//! # use parquet::encryption::decrypt::FileDecryptionProperties;
147//! # let test_data = arrow::util::test_util::parquet_test_data();
148//! # let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
149//! #
150//! let file = File::open(path).unwrap();
151//!
152//! // Define the AES encryption keys required required for decrypting the footer metadata
153//! // and column-specific data. If only a footer key is used then it is assumed that the
154//! // file uses uniform encryption and all columns are encrypted with the footer key.
155//! // If any column keys are specified, other columns without a key provided are assumed
156//! // to be unencrypted
157//! let footer_key = "0123456789012345".as_bytes(); // Keys are 128 bits (16 bytes)
158//! let column_1_key = "1234567890123450".as_bytes();
159//! let column_2_key = "1234567890123451".as_bytes();
160//!
161//! let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
162//! .with_column_key("double_field", column_1_key.to_vec())
163//! .with_column_key("float_field", column_2_key.to_vec())
164//! .build()
165//! .unwrap();
166//!
167//! let options = ArrowReaderOptions::default()
168//! .with_file_decryption_properties(decryption_properties);
169//! let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
170//! let file_metadata = reader_metadata.metadata().file_metadata();
171//! assert_eq!(50, file_metadata.num_rows());
172//!
173//! let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
174//! .unwrap()
175//! .build()
176//! .unwrap();
177//!
178//! let record_batch = reader.next().unwrap().unwrap();
179//! assert_eq!(50, record_batch.num_rows());
180//! ```
181
182experimental!(mod array_reader);
183pub mod arrow_reader;
184pub mod arrow_writer;
185mod buffer;
186mod decoder;
187
188#[cfg(feature = "async")]
189pub mod async_reader;
190#[cfg(feature = "async")]
191pub mod async_writer;
192
193pub mod push_decoder;
194
195mod in_memory_row_group;
196mod record_reader;
197
198experimental!(mod schema);
199
200use std::fmt::Debug;
201
202pub use self::arrow_writer::ArrowWriter;
203#[cfg(feature = "async")]
204pub use self::async_reader::ParquetRecordBatchStreamBuilder;
205#[cfg(feature = "async")]
206pub use self::async_writer::AsyncArrowWriter;
207use crate::schema::types::SchemaDescriptor;
208use arrow_schema::{FieldRef, Schema};
209
210pub use self::schema::{
211 ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema,
212 parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual,
213 parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*,
214};
215
216/// Schema metadata key used to store serialized Arrow schema
217///
218/// The Arrow schema is encoded using the Arrow IPC format, and then base64
219/// encoded. This is the same format used by arrow-cpp systems, such as pyarrow.
220pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
221
222/// The value of this metadata key, if present on [`Field::metadata`], will be used
223/// to populate [`BasicTypeInfo::id`]
224///
225/// [`Field::metadata`]: arrow_schema::Field::metadata
226/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
227pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
228
229/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
230///
231/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices
232/// or root column indices where:
233///
234/// * Root columns are the direct children of the root schema, enumerated in order
235/// * Leaf columns are the child-less leaves of the schema as enumerated by a depth-first search
236///
237/// For example, the schema
238///
239/// ```ignore
240/// message schema {
241/// REQUIRED boolean leaf_1;
242/// REQUIRED GROUP group {
243/// OPTIONAL int32 leaf_2;
244/// OPTIONAL int64 leaf_3;
245/// }
246/// }
247/// ```
248///
249/// Has roots `["leaf_1", "group"]` and leaves `["leaf_1", "leaf_2", "leaf_3"]`
250///
251/// For non-nested schemas, i.e. those containing only primitive columns, the root
252/// and leaves are the same
253///
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct ProjectionMask {
256 /// If `Some`, a leaf column should be included if the value at
257 /// the corresponding index is true
258 ///
259 /// If `None`, all columns should be included
260 ///
261 /// # Examples
262 ///
263 /// Given the original parquet schema with leaf columns is `[a, b, c, d]`
264 ///
265 /// A mask of `[true, false, true, false]` will result in a schema 2
266 /// elements long:
267 /// * `fields[0]`: `a`
268 /// * `fields[1]`: `c`
269 ///
270 /// A mask of `None` will result in a schema 4 elements long:
271 /// * `fields[0]`: `a`
272 /// * `fields[1]`: `b`
273 /// * `fields[2]`: `c`
274 /// * `fields[3]`: `d`
275 mask: Option<Vec<bool>>,
276}
277
278impl ProjectionMask {
279 /// Create a [`ProjectionMask`] which selects all columns
280 pub fn all() -> Self {
281 Self { mask: None }
282 }
283
284 /// Create a [`ProjectionMask`] which selects no columns
285 pub fn none(len: usize) -> Self {
286 Self {
287 mask: Some(vec![false; len]),
288 }
289 }
290
291 /// Create a [`ProjectionMask`] which selects only the specified leaf columns
292 ///
293 /// Note: repeated or out of order indices will not impact the final mask
294 ///
295 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
296 pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
297 let mut mask = vec![false; schema.num_columns()];
298 for leaf_idx in indices {
299 mask[leaf_idx] = true;
300 }
301 Self { mask: Some(mask) }
302 }
303
304 /// Create a [`ProjectionMask`] which selects only the specified root columns
305 ///
306 /// Note: repeated or out of order indices will not impact the final mask
307 ///
308 /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
309 pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
310 let num_root_columns = schema.root_schema().get_fields().len();
311 let mut root_mask = vec![false; num_root_columns];
312 for root_idx in indices {
313 root_mask[root_idx] = true;
314 }
315
316 let mask = (0..schema.num_columns())
317 .map(|leaf_idx| {
318 let root_idx = schema.get_column_root_idx(leaf_idx);
319 root_mask[root_idx]
320 })
321 .collect();
322
323 Self { mask: Some(mask) }
324 }
325
326 /// Create a [`ProjectionMask`] which selects only the named columns
327 ///
328 /// All leaf columns that fall below a given name will be selected. For example, given
329 /// the schema
330 /// ```ignore
331 /// message schema {
332 /// OPTIONAL group a (MAP) {
333 /// REPEATED group key_value {
334 /// REQUIRED BYTE_ARRAY key (UTF8); // leaf index 0
335 /// OPTIONAL group value (MAP) {
336 /// REPEATED group key_value {
337 /// REQUIRED INT32 key; // leaf index 1
338 /// REQUIRED BOOLEAN value; // leaf index 2
339 /// }
340 /// }
341 /// }
342 /// }
343 /// REQUIRED INT32 b; // leaf index 3
344 /// REQUIRED DOUBLE c; // leaf index 4
345 /// }
346 /// ```
347 /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return
348 /// columns 0, 1, and 2.
349 ///
350 /// Note: repeated or out of order indices will not impact the final mask.
351 ///
352 /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`.
353 ///
354 /// Also, this will not produce the desired results if a column contains a '.' in its name.
355 /// Use [`Self::leaves`] or [`Self::roots`] in that case.
356 pub fn columns<'a>(
357 schema: &SchemaDescriptor,
358 names: impl IntoIterator<Item = &'a str>,
359 ) -> Self {
360 let mut mask = vec![false; schema.num_columns()];
361 for name in names {
362 let name_path: Vec<&str> = name.split('.').collect();
363 for (idx, col) in schema.columns().iter().enumerate() {
364 let path = col.path().parts();
365 // searching for "a.b.c" cannot match "a.b"
366 if name_path.len() > path.len() {
367 continue;
368 }
369 // now path >= name_path, so check that each element in name_path matches
370 if name_path.iter().zip(path.iter()).all(|(a, b)| a == b) {
371 mask[idx] = true;
372 }
373 }
374 }
375
376 Self { mask: Some(mask) }
377 }
378
379 /// Returns true if the leaf column `leaf_idx` is included by the mask
380 pub fn leaf_included(&self, leaf_idx: usize) -> bool {
381 self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
382 }
383
384 /// Union two projection masks
385 ///
386 /// Example:
387 /// ```text
388 /// mask1 = [true, false, true]
389 /// mask2 = [false, true, true]
390 /// union(mask1, mask2) = [true, true, true]
391 /// ```
392 pub fn union(&mut self, other: &Self) {
393 match (self.mask.as_ref(), other.mask.as_ref()) {
394 (None, _) | (_, None) => self.mask = None,
395 (Some(a), Some(b)) => {
396 debug_assert_eq!(a.len(), b.len());
397 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
398 self.mask = Some(mask);
399 }
400 }
401 }
402
403 /// Intersect two projection masks
404 ///
405 /// Example:
406 /// ```text
407 /// mask1 = [true, false, true]
408 /// mask2 = [false, true, true]
409 /// intersect(mask1, mask2) = [false, false, true]
410 /// ```
411 pub fn intersect(&mut self, other: &Self) {
412 match (self.mask.as_ref(), other.mask.as_ref()) {
413 (None, _) => self.mask = other.mask.clone(),
414 (_, None) => {}
415 (Some(a), Some(b)) => {
416 debug_assert_eq!(a.len(), b.len());
417 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
418 self.mask = Some(mask);
419 }
420 }
421 }
422
423 /// Return a new [`ProjectionMask`] that excludes any leaf columns that are
424 /// part of a nested type, such as struct, list, or map
425 ///
426 /// If there are no non-nested columns in the mask, returns `None`
427 pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option<Self> {
428 let num_leaves = schema.num_columns();
429
430 // Count how many leaves each root column has
431 let num_roots = schema.root_schema().get_fields().len();
432 let mut root_leaf_counts = vec![0usize; num_roots];
433 for leaf_idx in 0..num_leaves {
434 let root_idx = schema.get_column_root_idx(leaf_idx);
435 root_leaf_counts[root_idx] += 1;
436 }
437
438 // Keep only leaves whose root has exactly one leaf (non-nested) and is not a
439 // LIST. LIST is encoded as a wrapped logical type with a single leaf, e.g.
440 // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
441 //
442 // ```text
443 // // List<String> (list non-null, elements nullable)
444 // required group my_list (LIST) {
445 // repeated group list {
446 // optional binary element (STRING);
447 // }
448 // }
449 // ```
450 let mut included_leaves = Vec::new();
451 for leaf_idx in 0..num_leaves {
452 if self.leaf_included(leaf_idx) {
453 let root = schema.get_column_root(leaf_idx);
454 let root_idx = schema.get_column_root_idx(leaf_idx);
455 if root_leaf_counts[root_idx] == 1 && !root.is_list() {
456 included_leaves.push(leaf_idx);
457 }
458 }
459 }
460
461 if included_leaves.is_empty() {
462 None
463 } else {
464 Some(ProjectionMask::leaves(schema, included_leaves))
465 }
466 }
467}
468
469/// Lookups up the parquet column by name
470///
471/// Returns the parquet column index and the corresponding arrow field
472pub fn parquet_column<'a>(
473 parquet_schema: &SchemaDescriptor,
474 arrow_schema: &'a Schema,
475 name: &str,
476) -> Option<(usize, &'a FieldRef)> {
477 let (root_idx, field) = arrow_schema.fields.find(name)?;
478 if field.data_type().is_nested() {
479 // Nested fields are not supported and require non-trivial logic
480 // to correctly walk the parquet schema accounting for the
481 // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
482 //
483 // For example a ListArray could correspond to anything from 1 to 3 levels
484 // in the parquet schema
485 return None;
486 }
487
488 // This could be made more efficient (#TBD)
489 let parquet_idx = (0..parquet_schema.columns().len())
490 .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
491 Some((parquet_idx, field))
492}
493
494#[cfg(test)]
495mod test {
496 use crate::arrow::ArrowWriter;
497 use crate::file::metadata::{
498 ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetMetaDataWriter,
499 };
500 use crate::file::properties::{EnabledStatistics, WriterProperties};
501 use crate::schema::parser::parse_message_type;
502 use crate::schema::types::SchemaDescriptor;
503 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
504 use bytes::Bytes;
505 use std::sync::Arc;
506
507 use super::ProjectionMask;
508
509 #[test]
510 #[allow(deprecated)]
511 // Reproducer for https://github.com/apache/arrow-rs/issues/6464
512 fn test_metadata_read_write_partial_offset() {
513 let parquet_bytes = create_parquet_file();
514
515 // read the metadata from the file WITHOUT the page index structures
516 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
517 let original_metadata = ParquetMetaDataReader::new()
518 .with_metadata_options(Some(options))
519 .parse_and_finish(&parquet_bytes)
520 .unwrap();
521
522 // this should error because the page indexes are not present, but have offsets specified
523 let metadata_bytes = metadata_to_bytes(&original_metadata);
524 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
525 let err = ParquetMetaDataReader::new()
526 .with_metadata_options(Some(options))
527 .with_page_indexes(true) // there are no page indexes in the metadata
528 .parse_and_finish(&metadata_bytes)
529 .err()
530 .unwrap();
531 assert_eq!(
532 err.to_string(),
533 "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357"
534 );
535 }
536
537 #[test]
538 fn test_metadata_read_write_roundtrip() {
539 let parquet_bytes = create_parquet_file();
540
541 // read the metadata from the file
542 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
543 let original_metadata = ParquetMetaDataReader::new()
544 .with_metadata_options(Some(options))
545 .parse_and_finish(&parquet_bytes)
546 .unwrap();
547
548 // read metadata back from the serialized bytes and ensure it is the same
549 let metadata_bytes = metadata_to_bytes(&original_metadata);
550 assert_ne!(
551 metadata_bytes.len(),
552 parquet_bytes.len(),
553 "metadata is subset of parquet"
554 );
555
556 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
557 let roundtrip_metadata = ParquetMetaDataReader::new()
558 .with_metadata_options(Some(options))
559 .parse_and_finish(&metadata_bytes)
560 .unwrap();
561
562 assert_eq!(original_metadata, roundtrip_metadata);
563 }
564
565 #[test]
566 #[allow(deprecated)]
567 fn test_metadata_read_write_roundtrip_page_index() {
568 let parquet_bytes = create_parquet_file();
569
570 // read the metadata from the file including the page index structures
571 // (which are stored elsewhere in the footer)
572 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
573 let original_metadata = ParquetMetaDataReader::new()
574 .with_metadata_options(Some(options))
575 .with_page_indexes(true)
576 .parse_and_finish(&parquet_bytes)
577 .unwrap();
578
579 // read metadata back from the serialized bytes and ensure it is the same
580 let metadata_bytes = metadata_to_bytes(&original_metadata);
581 let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
582 let roundtrip_metadata = ParquetMetaDataReader::new()
583 .with_metadata_options(Some(options))
584 .with_page_indexes(true)
585 .parse_and_finish(&metadata_bytes)
586 .unwrap();
587
588 // Need to normalize the metadata first to remove offsets in data
589 let original_metadata = normalize_locations(original_metadata);
590 let roundtrip_metadata = normalize_locations(roundtrip_metadata);
591 assert_eq!(
592 format!("{original_metadata:#?}"),
593 format!("{roundtrip_metadata:#?}")
594 );
595 assert_eq!(original_metadata, roundtrip_metadata);
596 }
597
598 /// Sets the page index offset locations in the metadata to `None`
599 ///
600 /// This is because the offsets are used to find the relative location of the index
601 /// structures, and thus differ depending on how the structures are stored.
602 fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
603 let mut metadata_builder = metadata.into_builder();
604 for rg in metadata_builder.take_row_groups() {
605 let mut rg_builder = rg.into_builder();
606 for col in rg_builder.take_columns() {
607 rg_builder = rg_builder.add_column_metadata(
608 col.into_builder()
609 .set_offset_index_offset(None)
610 .set_index_page_offset(None)
611 .set_column_index_offset(None)
612 .build()
613 .unwrap(),
614 );
615 }
616 let rg = rg_builder.build().unwrap();
617 metadata_builder = metadata_builder.add_row_group(rg);
618 }
619 metadata_builder.build()
620 }
621
622 /// Write a parquet filed into an in memory buffer
623 fn create_parquet_file() -> Bytes {
624 let mut buf = vec![];
625 let data = vec![100, 200, 201, 300, 102, 33];
626 let array: ArrayRef = Arc::new(Int32Array::from(data));
627 let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
628 let props = WriterProperties::builder()
629 .set_statistics_enabled(EnabledStatistics::Page)
630 .set_write_page_header_statistics(true)
631 .build();
632
633 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
634 writer.write(&batch).unwrap();
635 writer.finish().unwrap();
636 drop(writer);
637
638 Bytes::from(buf)
639 }
640
641 /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
642 fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
643 let mut buf = vec![];
644 ParquetMetaDataWriter::new(&mut buf, metadata)
645 .finish()
646 .unwrap();
647 Bytes::from(buf)
648 }
649
650 #[test]
651 fn test_mask_from_column_names() {
652 let schema = parse_schema(
653 "
654 message test_schema {
655 OPTIONAL group a (MAP) {
656 REPEATED group key_value {
657 REQUIRED BYTE_ARRAY key (UTF8);
658 OPTIONAL group value (MAP) {
659 REPEATED group key_value {
660 REQUIRED INT32 key;
661 REQUIRED BOOLEAN value;
662 }
663 }
664 }
665 }
666 REQUIRED INT32 b;
667 REQUIRED DOUBLE c;
668 }
669 ",
670 );
671
672 let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
673 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
674
675 let mask = ProjectionMask::columns(&schema, []);
676 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
677
678 let mask = ProjectionMask::columns(&schema, ["a", "c"]);
679 assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
680
681 let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
682 assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
683
684 let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
685 assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
686
687 let schema = parse_schema(
688 "
689 message test_schema {
690 OPTIONAL group a (LIST) {
691 REPEATED group list {
692 OPTIONAL group element (LIST) {
693 REPEATED group list {
694 OPTIONAL group element (LIST) {
695 REPEATED group list {
696 OPTIONAL BYTE_ARRAY element (UTF8);
697 }
698 }
699 }
700 }
701 }
702 }
703 REQUIRED INT32 b;
704 }
705 ",
706 );
707
708 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
709 assert_eq!(mask.mask.unwrap(), [true, true]);
710
711 let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
712 assert_eq!(mask.mask.unwrap(), [true, true]);
713
714 let mask =
715 ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
716 assert_eq!(mask.mask.unwrap(), [true, true]);
717
718 let mask = ProjectionMask::columns(&schema, ["b"]);
719 assert_eq!(mask.mask.unwrap(), [false, true]);
720
721 let schema = parse_schema(
722 "
723 message test_schema {
724 OPTIONAL INT32 a;
725 OPTIONAL INT32 b;
726 OPTIONAL INT32 c;
727 OPTIONAL INT32 d;
728 OPTIONAL INT32 e;
729 }
730 ",
731 );
732
733 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
734 assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
735
736 let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
737 assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
738
739 let schema = parse_schema(
740 "
741 message test_schema {
742 OPTIONAL INT32 a;
743 OPTIONAL INT32 b;
744 OPTIONAL INT32 a;
745 OPTIONAL INT32 d;
746 OPTIONAL INT32 e;
747 }
748 ",
749 );
750
751 let mask = ProjectionMask::columns(&schema, ["a", "e"]);
752 assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
753
754 let schema = parse_schema(
755 "
756 message test_schema {
757 OPTIONAL INT32 a;
758 OPTIONAL INT32 aa;
759 }
760 ",
761 );
762
763 let mask = ProjectionMask::columns(&schema, ["a"]);
764 assert_eq!(mask.mask.unwrap(), [true, false]);
765 }
766
767 #[test]
768 fn test_projection_mask_union() {
769 let mut mask1 = ProjectionMask {
770 mask: Some(vec![true, false, true]),
771 };
772 let mask2 = ProjectionMask {
773 mask: Some(vec![false, true, true]),
774 };
775 mask1.union(&mask2);
776 assert_eq!(mask1.mask, Some(vec![true, true, true]));
777
778 let mut mask1 = ProjectionMask { mask: None };
779 let mask2 = ProjectionMask {
780 mask: Some(vec![false, true, true]),
781 };
782 mask1.union(&mask2);
783 assert_eq!(mask1.mask, None);
784
785 let mut mask1 = ProjectionMask {
786 mask: Some(vec![true, false, true]),
787 };
788 let mask2 = ProjectionMask { mask: None };
789 mask1.union(&mask2);
790 assert_eq!(mask1.mask, None);
791
792 let mut mask1 = ProjectionMask { mask: None };
793 let mask2 = ProjectionMask { mask: None };
794 mask1.union(&mask2);
795 assert_eq!(mask1.mask, None);
796 }
797
798 #[test]
799 fn test_projection_mask_intersect() {
800 let mut mask1 = ProjectionMask {
801 mask: Some(vec![true, false, true]),
802 };
803 let mask2 = ProjectionMask {
804 mask: Some(vec![false, true, true]),
805 };
806 mask1.intersect(&mask2);
807 assert_eq!(mask1.mask, Some(vec![false, false, true]));
808
809 let mut mask1 = ProjectionMask { mask: None };
810 let mask2 = ProjectionMask {
811 mask: Some(vec![false, true, true]),
812 };
813 mask1.intersect(&mask2);
814 assert_eq!(mask1.mask, Some(vec![false, true, true]));
815
816 let mut mask1 = ProjectionMask {
817 mask: Some(vec![true, false, true]),
818 };
819 let mask2 = ProjectionMask { mask: None };
820 mask1.intersect(&mask2);
821 assert_eq!(mask1.mask, Some(vec![true, false, true]));
822
823 let mut mask1 = ProjectionMask { mask: None };
824 let mask2 = ProjectionMask { mask: None };
825 mask1.intersect(&mask2);
826 assert_eq!(mask1.mask, None);
827 }
828
829 #[test]
830 fn test_projection_mask_without_nested_no_nested() {
831 // Schema with no nested types
832 let schema = parse_schema(
833 "
834 message test_schema {
835 OPTIONAL INT32 a;
836 OPTIONAL INT32 b;
837 REQUIRED DOUBLE d;
838 }
839 ",
840 );
841
842 let mask = ProjectionMask::all();
843 // All columns are non-nested, but without_nested_types returns a new mask
844 assert_eq!(
845 Some(ProjectionMask::leaves(&schema, [0, 1, 2])),
846 mask.without_nested_types(&schema)
847 );
848
849 // select b, c
850 let mask = ProjectionMask::leaves(&schema, [1, 2]);
851 assert_eq!(Some(mask.clone()), mask.without_nested_types(&schema));
852 }
853
854 #[test]
855 fn test_projection_mask_without_nested_nested() {
856 // Schema with nested types (structs)
857 let schema = parse_schema(
858 "
859 message test_schema {
860 OPTIONAL INT32 a;
861 OPTIONAL group b {
862 REQUIRED INT32 b1;
863 OPTIONAL INT64 b2;
864 }
865 OPTIONAL group c (LIST) {
866 REPEATED group list {
867 OPTIONAL INT32 element;
868 }
869 }
870 REQUIRED DOUBLE d;
871 }
872 ",
873 );
874
875 // all leaves --> a, d
876 let mask = ProjectionMask::all();
877 assert_eq!(
878 Some(ProjectionMask::leaves(&schema, [0, 4])),
879 mask.without_nested_types(&schema)
880 );
881
882 // b1 --> empty (it is nested)
883 let mask = ProjectionMask::leaves(&schema, [1]);
884 assert_eq!(None, mask.without_nested_types(&schema));
885
886 // b2, d --> d
887 let mask = ProjectionMask::leaves(&schema, [1, 4]);
888 assert_eq!(
889 Some(ProjectionMask::leaves(&schema, [4])),
890 mask.without_nested_types(&schema)
891 );
892
893 // element --> empty (it is nested)
894 let mask = ProjectionMask::leaves(&schema, [3]);
895 assert_eq!(None, mask.without_nested_types(&schema));
896 }
897
898 #[test]
899 fn test_projection_mask_without_nested_map_only() {
900 // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
901 let schema = parse_schema(
902 "
903 message test_schema {
904 required group my_map (MAP) {
905 repeated group key_value {
906 required binary key (STRING);
907 optional int32 value;
908 }
909 }
910 }
911 ",
912 );
913
914 let mask = ProjectionMask::all();
915 assert_eq!(None, mask.without_nested_types(&schema));
916
917 // key --> empty (it is nested)
918 let mask = ProjectionMask::leaves(&schema, [0]);
919 assert_eq!(None, mask.without_nested_types(&schema));
920
921 // value --> empty (it is nested)
922 let mask = ProjectionMask::leaves(&schema, [1]);
923 assert_eq!(None, mask.without_nested_types(&schema));
924 }
925
926 #[test]
927 fn test_projection_mask_without_nested_map_with_non_nested() {
928 // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
929 // with an additional non-nested field
930 let schema = parse_schema(
931 "
932 message test_schema {
933 REQUIRED INT32 a;
934 required group my_map (MAP) {
935 repeated group key_value {
936 required binary key (STRING);
937 optional int32 value;
938 }
939 }
940 REQUIRED INT32 b;
941 }
942 ",
943 );
944
945 // all leaves --> a, b which are the only non nested ones
946 let mask = ProjectionMask::all();
947 assert_eq!(
948 Some(ProjectionMask::leaves(&schema, [0, 3])),
949 mask.without_nested_types(&schema)
950 );
951
952 // key, value, b --> b (the only non-nested one)
953 let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
954 assert_eq!(
955 Some(ProjectionMask::leaves(&schema, [3])),
956 mask.without_nested_types(&schema)
957 );
958
959 // key, value --> NONE
960 let mask = ProjectionMask::leaves(&schema, [1, 2]);
961 assert_eq!(None, mask.without_nested_types(&schema));
962 }
963
964 #[test]
965 fn test_projection_mask_without_nested_deeply_nested() {
966 // Map of Maps
967 let schema = parse_schema(
968 "
969 message test_schema {
970 OPTIONAL group a (MAP) {
971 REPEATED group key_value {
972 REQUIRED BYTE_ARRAY key (UTF8);
973 OPTIONAL group value (MAP) {
974 REPEATED group key_value {
975 REQUIRED INT32 key;
976 REQUIRED BOOLEAN value;
977 }
978 }
979 }
980 }
981 REQUIRED INT32 b;
982 REQUIRED DOUBLE c;
983 ",
984 );
985
986 let mask = ProjectionMask::all();
987 assert_eq!(
988 Some(ProjectionMask::leaves(&schema, [3, 4])),
989 mask.without_nested_types(&schema)
990 );
991
992 // (first) key, c --> c (the only non-nested one)
993 let mask = ProjectionMask::leaves(&schema, [0, 4]);
994 assert_eq!(
995 Some(ProjectionMask::leaves(&schema, [4])),
996 mask.without_nested_types(&schema)
997 );
998
999 // (second) key, value, b --> b (the only non-nested one)
1000 let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
1001 assert_eq!(
1002 Some(ProjectionMask::leaves(&schema, [3])),
1003 mask.without_nested_types(&schema)
1004 );
1005
1006 // key --> NONE (the only non-nested one)
1007 let mask = ProjectionMask::leaves(&schema, [0]);
1008 assert_eq!(None, mask.without_nested_types(&schema));
1009 }
1010
1011 #[test]
1012 fn test_projection_mask_without_nested_list() {
1013 // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
1014 let schema = parse_schema(
1015 "
1016 message test_schema {
1017 required group my_list (LIST) {
1018 repeated group list {
1019 optional binary element (STRING);
1020 }
1021 }
1022 REQUIRED INT32 b;
1023 }
1024 ",
1025 );
1026
1027 let mask = ProjectionMask::all();
1028 assert_eq!(
1029 Some(ProjectionMask::leaves(&schema, [1])),
1030 mask.without_nested_types(&schema),
1031 );
1032
1033 // element --> empty (it is nested)
1034 let mask = ProjectionMask::leaves(&schema, [0]);
1035 assert_eq!(None, mask.without_nested_types(&schema));
1036
1037 // element, b --> b (it is nested)
1038 let mask = ProjectionMask::leaves(&schema, [0, 1]);
1039 assert_eq!(
1040 Some(ProjectionMask::leaves(&schema, [1])),
1041 mask.without_nested_types(&schema),
1042 );
1043 }
1044
1045 /// Converts a schema string into a `SchemaDescriptor`
1046 fn parse_schema(schema: &str) -> SchemaDescriptor {
1047 let parquet_group_type = parse_message_type(schema).unwrap();
1048 SchemaDescriptor::new(Arc::new(parquet_group_type))
1049 }
1050}