1experimental!(mod array_reader);
98pub mod arrow_reader;
99pub mod arrow_writer;
100mod buffer;
101mod decoder;
102
103#[cfg(feature = "async")]
104pub mod async_reader;
105#[cfg(feature = "async")]
106pub mod async_writer;
107
108mod record_reader;
109experimental!(mod schema);
110
111use std::sync::Arc;
112
113pub use self::arrow_writer::ArrowWriter;
114#[cfg(feature = "async")]
115pub use self::async_reader::ParquetRecordBatchStreamBuilder;
116#[cfg(feature = "async")]
117pub use self::async_writer::AsyncArrowWriter;
118use crate::schema::types::{SchemaDescriptor, Type};
119use arrow_schema::{FieldRef, Schema};
120
121#[allow(deprecated)]
123pub use self::schema::arrow_to_parquet_schema;
124
125pub use self::schema::{
126 add_encoded_arrow_schema_to_metadata, encode_arrow_schema, parquet_to_arrow_field_levels,
127 parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, ArrowSchemaConverter, FieldLevels,
128};
129
130pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
132
133pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
139
140#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct ProjectionMask {
167 mask: Option<Vec<bool>>,
172}
173
174impl ProjectionMask {
175 pub fn all() -> Self {
177 Self { mask: None }
178 }
179
180 pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
186 let mut mask = vec![false; schema.num_columns()];
187 for leaf_idx in indices {
188 mask[leaf_idx] = true;
189 }
190 Self { mask: Some(mask) }
191 }
192
193 pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
199 let num_root_columns = schema.root_schema().get_fields().len();
200 let mut root_mask = vec![false; num_root_columns];
201 for root_idx in indices {
202 root_mask[root_idx] = true;
203 }
204
205 let mask = (0..schema.num_columns())
206 .map(|leaf_idx| {
207 let root_idx = schema.get_column_root_idx(leaf_idx);
208 root_mask[root_idx]
209 })
210 .collect();
211
212 Self { mask: Some(mask) }
213 }
214
215 fn find_leaves(root: &Arc<Type>, parent: Option<&String>, paths: &mut Vec<String>) {
217 let path = parent
218 .map(|p| [p, root.name()].join("."))
219 .unwrap_or(root.name().to_string());
220 if root.is_group() {
221 for child in root.get_fields() {
222 Self::find_leaves(child, Some(&path), paths);
223 }
224 } else {
225 paths.push(path);
227 }
228 }
229
230 pub fn columns<'a>(
258 schema: &SchemaDescriptor,
259 names: impl IntoIterator<Item = &'a str>,
260 ) -> Self {
261 let mut paths: Vec<String> = vec![];
263 for root in schema.root_schema().get_fields() {
264 Self::find_leaves(root, None, &mut paths);
265 }
266 assert_eq!(paths.len(), schema.num_columns());
267
268 let mut mask = vec![false; schema.num_columns()];
269 for name in names {
270 for idx in 0..schema.num_columns() {
271 if paths[idx].starts_with(name) {
272 mask[idx] = true;
273 }
274 }
275 }
276
277 Self { mask: Some(mask) }
278 }
279
280 pub fn leaf_included(&self, leaf_idx: usize) -> bool {
282 self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
283 }
284
285 pub fn union(&mut self, other: &Self) {
294 match (self.mask.as_ref(), other.mask.as_ref()) {
295 (None, _) | (_, None) => self.mask = None,
296 (Some(a), Some(b)) => {
297 debug_assert_eq!(a.len(), b.len());
298 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
299 self.mask = Some(mask);
300 }
301 }
302 }
303
304 pub fn intersect(&mut self, other: &Self) {
313 match (self.mask.as_ref(), other.mask.as_ref()) {
314 (None, _) => self.mask = other.mask.clone(),
315 (_, None) => {}
316 (Some(a), Some(b)) => {
317 debug_assert_eq!(a.len(), b.len());
318 let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
319 self.mask = Some(mask);
320 }
321 }
322 }
323}
324
325pub fn parquet_column<'a>(
329 parquet_schema: &SchemaDescriptor,
330 arrow_schema: &'a Schema,
331 name: &str,
332) -> Option<(usize, &'a FieldRef)> {
333 let (root_idx, field) = arrow_schema.fields.find(name)?;
334 if field.data_type().is_nested() {
335 return None;
342 }
343
344 let parquet_idx = (0..parquet_schema.columns().len())
346 .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
347 Some((parquet_idx, field))
348}
349
350#[cfg(test)]
351mod test {
352 use crate::arrow::ArrowWriter;
353 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
354 use crate::file::properties::{EnabledStatistics, WriterProperties};
355 use crate::schema::parser::parse_message_type;
356 use crate::schema::types::SchemaDescriptor;
357 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
358 use bytes::Bytes;
359 use std::sync::Arc;
360
361 use super::ProjectionMask;
362
363 #[test]
364 fn test_metadata_read_write_partial_offset() {
366 let parquet_bytes = create_parquet_file();
367
368 let original_metadata = ParquetMetaDataReader::new()
370 .parse_and_finish(&parquet_bytes)
371 .unwrap();
372
373 let metadata_bytes = metadata_to_bytes(&original_metadata);
375 let err = ParquetMetaDataReader::new()
376 .with_page_indexes(true) .parse_and_finish(&metadata_bytes)
378 .err()
379 .unwrap();
380 assert_eq!(
381 err.to_string(),
382 "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
383 );
384 }
385
386 #[test]
387 fn test_metadata_read_write_roundtrip() {
388 let parquet_bytes = create_parquet_file();
389
390 let original_metadata = ParquetMetaDataReader::new()
392 .parse_and_finish(&parquet_bytes)
393 .unwrap();
394
395 let metadata_bytes = metadata_to_bytes(&original_metadata);
397 assert_ne!(
398 metadata_bytes.len(),
399 parquet_bytes.len(),
400 "metadata is subset of parquet"
401 );
402
403 let roundtrip_metadata = ParquetMetaDataReader::new()
404 .parse_and_finish(&metadata_bytes)
405 .unwrap();
406
407 assert_eq!(original_metadata, roundtrip_metadata);
408 }
409
410 #[test]
411 fn test_metadata_read_write_roundtrip_page_index() {
412 let parquet_bytes = create_parquet_file();
413
414 let original_metadata = ParquetMetaDataReader::new()
417 .with_page_indexes(true)
418 .parse_and_finish(&parquet_bytes)
419 .unwrap();
420
421 let metadata_bytes = metadata_to_bytes(&original_metadata);
423 let roundtrip_metadata = ParquetMetaDataReader::new()
424 .with_page_indexes(true)
425 .parse_and_finish(&metadata_bytes)
426 .unwrap();
427
428 let original_metadata = normalize_locations(original_metadata);
430 let roundtrip_metadata = normalize_locations(roundtrip_metadata);
431 assert_eq!(
432 format!("{original_metadata:#?}"),
433 format!("{roundtrip_metadata:#?}")
434 );
435 assert_eq!(original_metadata, roundtrip_metadata);
436 }
437
438 fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
443 let mut metadata_builder = metadata.into_builder();
444 for rg in metadata_builder.take_row_groups() {
445 let mut rg_builder = rg.into_builder();
446 for col in rg_builder.take_columns() {
447 rg_builder = rg_builder.add_column_metadata(
448 col.into_builder()
449 .set_offset_index_offset(None)
450 .set_index_page_offset(None)
451 .set_column_index_offset(None)
452 .build()
453 .unwrap(),
454 );
455 }
456 let rg = rg_builder.build().unwrap();
457 metadata_builder = metadata_builder.add_row_group(rg);
458 }
459 metadata_builder.build()
460 }
461
462 fn create_parquet_file() -> Bytes {
464 let mut buf = vec![];
465 let data = vec![100, 200, 201, 300, 102, 33];
466 let array: ArrayRef = Arc::new(Int32Array::from(data));
467 let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
468 let props = WriterProperties::builder()
469 .set_statistics_enabled(EnabledStatistics::Page)
470 .build();
471
472 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
473 writer.write(&batch).unwrap();
474 writer.finish().unwrap();
475 drop(writer);
476
477 Bytes::from(buf)
478 }
479
480 fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
482 let mut buf = vec![];
483 ParquetMetaDataWriter::new(&mut buf, metadata)
484 .finish()
485 .unwrap();
486 Bytes::from(buf)
487 }
488
489 #[test]
490 fn test_mask_from_column_names() {
491 let message_type = "
492 message test_schema {
493 OPTIONAL group a (MAP) {
494 REPEATED group key_value {
495 REQUIRED BYTE_ARRAY key (UTF8);
496 OPTIONAL group value (MAP) {
497 REPEATED group key_value {
498 REQUIRED INT32 key;
499 REQUIRED BOOLEAN value;
500 }
501 }
502 }
503 }
504 REQUIRED INT32 b;
505 REQUIRED DOUBLE c;
506 }
507 ";
508 let parquet_group_type = parse_message_type(message_type).unwrap();
509 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
510
511 let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
512 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
513
514 let mask = ProjectionMask::columns(&schema, []);
515 assert_eq!(mask.mask.unwrap(), vec![false; 5]);
516
517 let mask = ProjectionMask::columns(&schema, ["a", "c"]);
518 assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
519
520 let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
521 assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
522
523 let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
524 assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
525
526 let message_type = "
527 message test_schema {
528 OPTIONAL group a (LIST) {
529 REPEATED group list {
530 OPTIONAL group element (LIST) {
531 REPEATED group list {
532 OPTIONAL group element (LIST) {
533 REPEATED group list {
534 OPTIONAL BYTE_ARRAY element (UTF8);
535 }
536 }
537 }
538 }
539 }
540 }
541 REQUIRED INT32 b;
542 }
543 ";
544 let parquet_group_type = parse_message_type(message_type).unwrap();
545 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
546
547 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
548 assert_eq!(mask.mask.unwrap(), [true, true]);
549
550 let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
551 assert_eq!(mask.mask.unwrap(), [true, true]);
552
553 let mask =
554 ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
555 assert_eq!(mask.mask.unwrap(), [true, true]);
556
557 let mask = ProjectionMask::columns(&schema, ["b"]);
558 assert_eq!(mask.mask.unwrap(), [false, true]);
559
560 let message_type = "
561 message test_schema {
562 OPTIONAL INT32 a;
563 OPTIONAL INT32 b;
564 OPTIONAL INT32 c;
565 OPTIONAL INT32 d;
566 OPTIONAL INT32 e;
567 }
568 ";
569 let parquet_group_type = parse_message_type(message_type).unwrap();
570 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
571
572 let mask = ProjectionMask::columns(&schema, ["a", "b"]);
573 assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
574
575 let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
576 assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
577
578 let message_type = "
579 message test_schema {
580 OPTIONAL INT32 a;
581 OPTIONAL INT32 b;
582 OPTIONAL INT32 a;
583 OPTIONAL INT32 d;
584 OPTIONAL INT32 e;
585 }
586 ";
587 let parquet_group_type = parse_message_type(message_type).unwrap();
588 let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
589
590 let mask = ProjectionMask::columns(&schema, ["a", "e"]);
591 assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
592 }
593
594 #[test]
595 fn test_projection_mask_union() {
596 let mut mask1 = ProjectionMask {
597 mask: Some(vec![true, false, true]),
598 };
599 let mask2 = ProjectionMask {
600 mask: Some(vec![false, true, true]),
601 };
602 mask1.union(&mask2);
603 assert_eq!(mask1.mask, Some(vec![true, true, true]));
604
605 let mut mask1 = ProjectionMask { mask: None };
606 let mask2 = ProjectionMask {
607 mask: Some(vec![false, true, true]),
608 };
609 mask1.union(&mask2);
610 assert_eq!(mask1.mask, None);
611
612 let mut mask1 = ProjectionMask {
613 mask: Some(vec![true, false, true]),
614 };
615 let mask2 = ProjectionMask { mask: None };
616 mask1.union(&mask2);
617 assert_eq!(mask1.mask, None);
618
619 let mut mask1 = ProjectionMask { mask: None };
620 let mask2 = ProjectionMask { mask: None };
621 mask1.union(&mask2);
622 assert_eq!(mask1.mask, None);
623 }
624
625 #[test]
626 fn test_projection_mask_intersect() {
627 let mut mask1 = ProjectionMask {
628 mask: Some(vec![true, false, true]),
629 };
630 let mask2 = ProjectionMask {
631 mask: Some(vec![false, true, true]),
632 };
633 mask1.intersect(&mask2);
634 assert_eq!(mask1.mask, Some(vec![false, false, true]));
635
636 let mut mask1 = ProjectionMask { mask: None };
637 let mask2 = ProjectionMask {
638 mask: Some(vec![false, true, true]),
639 };
640 mask1.intersect(&mask2);
641 assert_eq!(mask1.mask, Some(vec![false, true, true]));
642
643 let mut mask1 = ProjectionMask {
644 mask: Some(vec![true, false, true]),
645 };
646 let mask2 = ProjectionMask { mask: None };
647 mask1.intersect(&mask2);
648 assert_eq!(mask1.mask, Some(vec![true, false, true]));
649
650 let mut mask1 = ProjectionMask { mask: None };
651 let mask2 = ProjectionMask { mask: None };
652 mask1.intersect(&mask2);
653 assert_eq!(mask1.mask, None);
654 }
655}