1use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45pub fn parquet_to_arrow_schema(
50 parquet_schema: &SchemaDescriptor,
51 key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56pub fn parquet_to_arrow_schema_by_columns(
59 parquet_schema: &SchemaDescriptor,
60 mask: ProjectionMask,
61 key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66pub(crate) fn parquet_to_arrow_schema_and_fields(
68 parquet_schema: &SchemaDescriptor,
69 mask: ProjectionMask,
70 key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73 let maybe_schema = metadata
74 .remove(super::ARROW_SCHEMA_META_KEY)
75 .map(|value| get_arrow_schema_from_metadata(&value))
76 .transpose()?;
77
78 if let Some(arrow_schema) = &maybe_schema {
80 arrow_schema.metadata().iter().for_each(|(k, v)| {
81 metadata.entry(k.clone()).or_insert_with(|| v.clone());
82 });
83 }
84
85 let hint = maybe_schema.as_ref().map(|s| s.fields());
86 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88 Ok((schema, field_levels.levels))
89}
90
91#[derive(Debug, Clone)]
99pub struct FieldLevels {
100 pub(crate) fields: Fields,
101 pub(crate) levels: Option<ParquetField>,
102}
103
104pub fn parquet_to_arrow_field_levels(
125 schema: &SchemaDescriptor,
126 mask: ProjectionMask,
127 hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129 match complex::convert_schema(schema, mask, hint)? {
130 Some(field) => match &field.arrow_type {
131 DataType::Struct(fields) => Ok(FieldLevels {
132 fields: fields.clone(),
133 levels: Some(field),
134 }),
135 _ => unreachable!(),
136 },
137 None => Ok(FieldLevels {
138 fields: Fields::empty(),
139 levels: None,
140 }),
141 }
142}
143
144fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146 let decoded = BASE64_STANDARD.decode(encoded_meta);
147 match decoded {
148 Ok(bytes) => {
149 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150 &bytes[8..]
151 } else {
152 bytes.as_slice()
153 };
154 match arrow_ipc::root_as_message(slice) {
155 Ok(message) => message
156 .header_as_schema()
157 .map(arrow_ipc::convert::fb_to_schema)
158 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159 Err(err) => {
160 Err(arrow_err!(
162 "Unable to get root as message stored in {}: {:?}",
163 super::ARROW_SCHEMA_META_KEY,
164 err
165 ))
166 }
167 }
168 }
169 Err(err) => {
170 Err(arrow_err!(
172 "Unable to decode the encoded schema stored in {}, {:?}",
173 super::ARROW_SCHEMA_META_KEY,
174 err
175 ))
176 }
177 }
178}
179
180pub fn encode_arrow_schema(schema: &Schema) -> String {
182 let options = writer::IpcWriteOptions::default();
183 #[allow(deprecated)]
184 let mut dictionary_tracker =
185 writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
186 let data_gen = writer::IpcDataGenerator::default();
187 let mut serialized_schema =
188 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
189
190 let schema_len = serialized_schema.ipc_message.len();
193 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196 len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198 BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
208 let encoded = encode_arrow_schema(schema);
209
210 let schema_kv = KeyValue {
211 key: super::ARROW_SCHEMA_META_KEY.to_string(),
212 value: Some(encoded),
213 };
214
215 let meta = props
216 .key_value_metadata
217 .get_or_insert_with(Default::default);
218
219 let schema_meta = meta
221 .iter()
222 .enumerate()
223 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
224 match schema_meta {
225 Some((i, _)) => {
226 meta.remove(i);
227 meta.push(schema_kv);
228 }
229 None => {
230 meta.push(schema_kv);
231 }
232 }
233}
234
235#[derive(Debug)]
280pub struct ArrowSchemaConverter<'a> {
281 schema_root: &'a str,
283 coerce_types: bool,
287}
288
289impl Default for ArrowSchemaConverter<'_> {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295impl<'a> ArrowSchemaConverter<'a> {
296 pub fn new() -> Self {
298 Self {
299 schema_root: "arrow_schema",
300 coerce_types: false,
301 }
302 }
303
304 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
335 self.coerce_types = coerce_types;
336 self
337 }
338
339 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
341 self.schema_root = schema_root;
342 self
343 }
344
345 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
349 let fields = schema
350 .fields()
351 .iter()
352 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
353 .collect::<Result<_>>()?;
354 let group = Type::group_type_builder(self.schema_root)
355 .with_fields(fields)
356 .build()?;
357 Ok(SchemaDescriptor::new(Arc::new(group)))
358 }
359}
360
361fn parse_key_value_metadata(
362 key_value_metadata: Option<&Vec<KeyValue>>,
363) -> Option<HashMap<String, String>> {
364 match key_value_metadata {
365 Some(key_values) => {
366 let map: HashMap<String, String> = key_values
367 .iter()
368 .filter_map(|kv| {
369 kv.value
370 .as_ref()
371 .map(|value| (kv.key.clone(), value.clone()))
372 })
373 .collect();
374
375 if map.is_empty() {
376 None
377 } else {
378 Some(map)
379 }
380 }
381 None => None,
382 }
383}
384
385pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
387 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
388 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
389
390 let basic_info = parquet_column.self_type().get_basic_info();
391 let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
392 2
393 } else {
394 1
395 });
396 if basic_info.has_id() {
397 meta.insert(
398 PARQUET_FIELD_ID_META_KEY.to_string(),
399 basic_info.id().to_string(),
400 );
401 }
402 #[cfg(feature = "arrow_canonical_extension_types")]
403 if let Some(logical_type) = basic_info.logical_type() {
404 match logical_type {
405 LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
406 LogicalType::Json => ret.try_with_extension_type(Json::default())?,
407 _ => {}
408 }
409 }
410 if !meta.is_empty() {
411 ret.set_metadata(meta);
412 }
413
414 Ok(ret)
415}
416
417pub fn decimal_length_from_precision(precision: u8) -> usize {
418 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
426}
427
428fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
430 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
431 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
432 const PARQUET_KEY_FIELD_NAME: &str = "key";
433 const PARQUET_VALUE_FIELD_NAME: &str = "value";
434
435 let name = field.name().as_str();
436 let repetition = if field.is_nullable() {
437 Repetition::OPTIONAL
438 } else {
439 Repetition::REQUIRED
440 };
441 let id = field_id(field);
442 match field.data_type() {
444 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
445 .with_logical_type(Some(LogicalType::Unknown))
446 .with_repetition(repetition)
447 .with_id(id)
448 .build(),
449 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
450 .with_repetition(repetition)
451 .with_id(id)
452 .build(),
453 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
454 .with_logical_type(Some(LogicalType::Integer {
455 bit_width: 8,
456 is_signed: true,
457 }))
458 .with_repetition(repetition)
459 .with_id(id)
460 .build(),
461 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
462 .with_logical_type(Some(LogicalType::Integer {
463 bit_width: 16,
464 is_signed: true,
465 }))
466 .with_repetition(repetition)
467 .with_id(id)
468 .build(),
469 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
470 .with_repetition(repetition)
471 .with_id(id)
472 .build(),
473 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
474 .with_repetition(repetition)
475 .with_id(id)
476 .build(),
477 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
478 .with_logical_type(Some(LogicalType::Integer {
479 bit_width: 8,
480 is_signed: false,
481 }))
482 .with_repetition(repetition)
483 .with_id(id)
484 .build(),
485 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
486 .with_logical_type(Some(LogicalType::Integer {
487 bit_width: 16,
488 is_signed: false,
489 }))
490 .with_repetition(repetition)
491 .with_id(id)
492 .build(),
493 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
494 .with_logical_type(Some(LogicalType::Integer {
495 bit_width: 32,
496 is_signed: false,
497 }))
498 .with_repetition(repetition)
499 .with_id(id)
500 .build(),
501 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
502 .with_logical_type(Some(LogicalType::Integer {
503 bit_width: 64,
504 is_signed: false,
505 }))
506 .with_repetition(repetition)
507 .with_id(id)
508 .build(),
509 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
510 .with_repetition(repetition)
511 .with_id(id)
512 .with_logical_type(Some(LogicalType::Float16))
513 .with_length(2)
514 .build(),
515 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
516 .with_repetition(repetition)
517 .with_id(id)
518 .build(),
519 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
520 .with_repetition(repetition)
521 .with_id(id)
522 .build(),
523 DataType::Timestamp(TimeUnit::Second, _) => {
524 Type::primitive_type_builder(name, PhysicalType::INT64)
526 .with_repetition(repetition)
527 .with_id(id)
528 .build()
529 }
530 DataType::Timestamp(time_unit, tz) => {
531 Type::primitive_type_builder(name, PhysicalType::INT64)
532 .with_logical_type(Some(LogicalType::Timestamp {
533 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
535 unit: match time_unit {
536 TimeUnit::Second => unreachable!(),
537 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
538 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
539 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
540 },
541 }))
542 .with_repetition(repetition)
543 .with_id(id)
544 .build()
545 }
546 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
547 .with_logical_type(Some(LogicalType::Date))
548 .with_repetition(repetition)
549 .with_id(id)
550 .build(),
551 DataType::Date64 => {
552 if coerce_types {
553 Type::primitive_type_builder(name, PhysicalType::INT32)
554 .with_logical_type(Some(LogicalType::Date))
555 .with_repetition(repetition)
556 .with_id(id)
557 .build()
558 } else {
559 Type::primitive_type_builder(name, PhysicalType::INT64)
560 .with_repetition(repetition)
561 .with_id(id)
562 .build()
563 }
564 }
565 DataType::Time32(TimeUnit::Second) => {
566 Type::primitive_type_builder(name, PhysicalType::INT32)
568 .with_repetition(repetition)
569 .with_id(id)
570 .build()
571 }
572 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
573 .with_logical_type(Some(LogicalType::Time {
574 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
575 unit: match unit {
576 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
577 u => unreachable!("Invalid unit for Time32: {:?}", u),
578 },
579 }))
580 .with_repetition(repetition)
581 .with_id(id)
582 .build(),
583 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
584 .with_logical_type(Some(LogicalType::Time {
585 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
586 unit: match unit {
587 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
588 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
589 u => unreachable!("Invalid unit for Time64: {:?}", u),
590 },
591 }))
592 .with_repetition(repetition)
593 .with_id(id)
594 .build(),
595 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
596 .with_repetition(repetition)
597 .with_id(id)
598 .build(),
599 DataType::Interval(_) => {
600 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
601 .with_converted_type(ConvertedType::INTERVAL)
602 .with_repetition(repetition)
603 .with_id(id)
604 .with_length(12)
605 .build()
606 }
607 DataType::Binary | DataType::LargeBinary => {
608 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
609 .with_repetition(repetition)
610 .with_id(id)
611 .build()
612 }
613 DataType::FixedSizeBinary(length) => {
614 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615 .with_repetition(repetition)
616 .with_id(id)
617 .with_length(*length)
618 .with_logical_type(
619 #[cfg(feature = "arrow_canonical_extension_types")]
620 field
622 .try_extension_type::<Uuid>()
623 .ok()
624 .map(|_| LogicalType::Uuid),
625 #[cfg(not(feature = "arrow_canonical_extension_types"))]
626 None,
627 )
628 .build()
629 }
630 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
631 .with_repetition(repetition)
632 .with_id(id)
633 .build(),
634 DataType::Decimal32(precision, scale)
635 | DataType::Decimal64(precision, scale)
636 | DataType::Decimal128(precision, scale)
637 | DataType::Decimal256(precision, scale) => {
638 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
641 (PhysicalType::INT32, -1)
642 } else if *precision <= 18 {
643 (PhysicalType::INT64, -1)
644 } else {
645 (
646 PhysicalType::FIXED_LEN_BYTE_ARRAY,
647 decimal_length_from_precision(*precision) as i32,
648 )
649 };
650 Type::primitive_type_builder(name, physical_type)
651 .with_repetition(repetition)
652 .with_id(id)
653 .with_length(length)
654 .with_logical_type(Some(LogicalType::Decimal {
655 scale: *scale as i32,
656 precision: *precision as i32,
657 }))
658 .with_precision(*precision as i32)
659 .with_scale(*scale as i32)
660 .build()
661 }
662 DataType::Utf8 | DataType::LargeUtf8 => {
663 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
664 .with_logical_type({
665 #[cfg(feature = "arrow_canonical_extension_types")]
666 {
667 field
670 .try_extension_type::<Json>()
671 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
672 }
673 #[cfg(not(feature = "arrow_canonical_extension_types"))]
674 Some(LogicalType::String)
675 })
676 .with_repetition(repetition)
677 .with_id(id)
678 .build()
679 }
680 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
681 .with_logical_type({
682 #[cfg(feature = "arrow_canonical_extension_types")]
683 {
684 field
687 .try_extension_type::<Json>()
688 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
689 }
690 #[cfg(not(feature = "arrow_canonical_extension_types"))]
691 Some(LogicalType::String)
692 })
693 .with_repetition(repetition)
694 .with_id(id)
695 .build(),
696 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
697 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
698 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
700 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
701 } else {
702 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
703 };
704
705 Type::group_type_builder(name)
706 .with_fields(vec![Arc::new(
707 Type::group_type_builder("list")
708 .with_fields(vec![field_ref])
709 .with_repetition(Repetition::REPEATED)
710 .build()?,
711 )])
712 .with_logical_type(Some(LogicalType::List))
713 .with_repetition(repetition)
714 .with_id(id)
715 .build()
716 }
717 DataType::ListView(_) | DataType::LargeListView(_) => {
718 unimplemented!("ListView/LargeListView not implemented")
719 }
720 DataType::Struct(fields) => {
721 if fields.is_empty() {
722 return Err(arrow_err!("Parquet does not support writing empty structs",));
723 }
724 let fields = fields
726 .iter()
727 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
728 .collect::<Result<_>>()?;
729 Type::group_type_builder(name)
730 .with_fields(fields)
731 .with_repetition(repetition)
732 .with_id(id)
733 .build()
734 }
735 DataType::Map(field, _) => {
736 if let DataType::Struct(struct_fields) = field.data_type() {
737 let map_struct_name = if coerce_types {
739 PARQUET_MAP_STRUCT_NAME
740 } else {
741 field.name()
742 };
743
744 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
746 if coerce_types && fld.name() != name {
747 let f = fld.as_ref().clone().with_name(name);
748 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
749 } else {
750 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
751 }
752 };
753 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
754 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
755
756 Type::group_type_builder(name)
757 .with_fields(vec![Arc::new(
758 Type::group_type_builder(map_struct_name)
759 .with_fields(vec![key_field, val_field])
760 .with_repetition(Repetition::REPEATED)
761 .build()?,
762 )])
763 .with_logical_type(Some(LogicalType::Map))
764 .with_repetition(repetition)
765 .with_id(id)
766 .build()
767 } else {
768 Err(arrow_err!(
769 "DataType::Map should contain a struct field child",
770 ))
771 }
772 }
773 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
774 DataType::Dictionary(_, ref value) => {
775 let dict_field = field.clone().with_data_type(value.as_ref().clone());
777 arrow_to_parquet_type(&dict_field, coerce_types)
778 }
779 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
780 "Converting RunEndEncodedType to parquet not supported",
781 )),
782 }
783}
784
785fn field_id(field: &Field) -> Option<i32> {
786 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
787 value.parse().ok() }
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793
794 use std::{collections::HashMap, sync::Arc};
795
796 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
797
798 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
799 use crate::file::metadata::KeyValue;
800 use crate::file::reader::FileReader;
801 use crate::{
802 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
803 schema::{parser::parse_message_type, types::SchemaDescriptor},
804 };
805
806 #[test]
807 fn test_flat_primitives() {
808 let message_type = "
809 message test_schema {
810 REQUIRED BOOLEAN boolean;
811 REQUIRED INT32 int8 (INT_8);
812 REQUIRED INT32 int16 (INT_16);
813 REQUIRED INT32 uint8 (INTEGER(8,false));
814 REQUIRED INT32 uint16 (INTEGER(16,false));
815 REQUIRED INT32 int32;
816 REQUIRED INT64 int64;
817 OPTIONAL DOUBLE double;
818 OPTIONAL FLOAT float;
819 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
820 OPTIONAL BINARY string (UTF8);
821 OPTIONAL BINARY string_2 (STRING);
822 OPTIONAL BINARY json (JSON);
823 }
824 ";
825 let parquet_group_type = parse_message_type(message_type).unwrap();
826
827 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
828 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
829
830 let arrow_fields = Fields::from(vec![
831 Field::new("boolean", DataType::Boolean, false),
832 Field::new("int8", DataType::Int8, false),
833 Field::new("int16", DataType::Int16, false),
834 Field::new("uint8", DataType::UInt8, false),
835 Field::new("uint16", DataType::UInt16, false),
836 Field::new("int32", DataType::Int32, false),
837 Field::new("int64", DataType::Int64, false),
838 Field::new("double", DataType::Float64, true),
839 Field::new("float", DataType::Float32, true),
840 Field::new("float16", DataType::Float16, true),
841 Field::new("string", DataType::Utf8, true),
842 Field::new("string_2", DataType::Utf8, true),
843 Field::new("json", DataType::Utf8, true),
844 ]);
845
846 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
847 }
848
849 #[test]
850 fn test_decimal_fields() {
851 let message_type = "
852 message test_schema {
853 REQUIRED INT32 decimal1 (DECIMAL(4,2));
854 REQUIRED INT64 decimal2 (DECIMAL(12,2));
855 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
856 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
857 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
858 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
859 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
860 }
861 ";
862
863 let parquet_group_type = parse_message_type(message_type).unwrap();
864
865 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
866 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
867
868 let arrow_fields = Fields::from(vec![
869 Field::new("decimal1", DataType::Decimal128(4, 2), false),
870 Field::new("decimal2", DataType::Decimal128(12, 2), false),
871 Field::new("decimal3", DataType::Decimal128(30, 2), false),
872 Field::new("decimal4", DataType::Decimal128(33, 2), false),
873 Field::new("decimal5", DataType::Decimal128(38, 2), false),
874 Field::new("decimal6", DataType::Decimal256(39, 2), false),
875 Field::new("decimal7", DataType::Decimal256(39, 2), false),
876 ]);
877 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
878 }
879
880 #[test]
881 fn test_byte_array_fields() {
882 let message_type = "
883 message test_schema {
884 REQUIRED BYTE_ARRAY binary;
885 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
886 }
887 ";
888
889 let parquet_group_type = parse_message_type(message_type).unwrap();
890
891 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
892 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
893
894 let arrow_fields = Fields::from(vec![
895 Field::new("binary", DataType::Binary, false),
896 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
897 ]);
898 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
899 }
900
901 #[test]
902 fn test_duplicate_fields() {
903 let message_type = "
904 message test_schema {
905 REQUIRED BOOLEAN boolean;
906 REQUIRED INT32 int8 (INT_8);
907 }
908 ";
909
910 let parquet_group_type = parse_message_type(message_type).unwrap();
911
912 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
913 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
914
915 let arrow_fields = Fields::from(vec![
916 Field::new("boolean", DataType::Boolean, false),
917 Field::new("int8", DataType::Int8, false),
918 ]);
919 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
920
921 let converted_arrow_schema =
922 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
923 .unwrap();
924 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
925 }
926
927 #[test]
928 fn test_parquet_lists() {
929 let mut arrow_fields = Vec::new();
930
931 let message_type = "
933 message test_schema {
934 REQUIRED GROUP my_list (LIST) {
935 REPEATED GROUP list {
936 OPTIONAL BINARY element (UTF8);
937 }
938 }
939 OPTIONAL GROUP my_list (LIST) {
940 REPEATED GROUP list {
941 REQUIRED BINARY element (UTF8);
942 }
943 }
944 OPTIONAL GROUP array_of_arrays (LIST) {
945 REPEATED GROUP list {
946 REQUIRED GROUP element (LIST) {
947 REPEATED GROUP list {
948 REQUIRED INT32 element;
949 }
950 }
951 }
952 }
953 OPTIONAL GROUP my_list (LIST) {
954 REPEATED GROUP element {
955 REQUIRED BINARY str (UTF8);
956 }
957 }
958 OPTIONAL GROUP my_list (LIST) {
959 REPEATED INT32 element;
960 }
961 OPTIONAL GROUP my_list (LIST) {
962 REPEATED GROUP element {
963 REQUIRED BINARY str (UTF8);
964 REQUIRED INT32 num;
965 }
966 }
967 OPTIONAL GROUP my_list (LIST) {
968 REPEATED GROUP array {
969 REQUIRED BINARY str (UTF8);
970 }
971
972 }
973 OPTIONAL GROUP my_list (LIST) {
974 REPEATED GROUP my_list_tuple {
975 REQUIRED BINARY str (UTF8);
976 }
977 }
978 REPEATED INT32 name;
979 }
980 ";
981
982 {
989 arrow_fields.push(Field::new_list(
990 "my_list",
991 Field::new("element", DataType::Utf8, true),
992 false,
993 ));
994 }
995
996 {
1003 arrow_fields.push(Field::new_list(
1004 "my_list",
1005 Field::new("element", DataType::Utf8, false),
1006 true,
1007 ));
1008 }
1009
1010 {
1023 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1024 arrow_fields.push(Field::new_list(
1025 "array_of_arrays",
1026 Field::new_list("element", arrow_inner_list, false),
1027 true,
1028 ));
1029 }
1030
1031 {
1038 arrow_fields.push(Field::new_list(
1039 "my_list",
1040 Field::new("str", DataType::Utf8, false),
1041 true,
1042 ));
1043 }
1044
1045 {
1050 arrow_fields.push(Field::new_list(
1051 "my_list",
1052 Field::new("element", DataType::Int32, false),
1053 true,
1054 ));
1055 }
1056
1057 {
1065 let fields = vec![
1066 Field::new("str", DataType::Utf8, false),
1067 Field::new("num", DataType::Int32, false),
1068 ];
1069 arrow_fields.push(Field::new_list(
1070 "my_list",
1071 Field::new_struct("element", fields, false),
1072 true,
1073 ));
1074 }
1075
1076 {
1084 let fields = vec![Field::new("str", DataType::Utf8, false)];
1085 arrow_fields.push(Field::new_list(
1086 "my_list",
1087 Field::new_struct("array", fields, false),
1088 true,
1089 ));
1090 }
1091
1092 {
1100 let fields = vec![Field::new("str", DataType::Utf8, false)];
1101 arrow_fields.push(Field::new_list(
1102 "my_list",
1103 Field::new_struct("my_list_tuple", fields, false),
1104 true,
1105 ));
1106 }
1107
1108 {
1111 arrow_fields.push(Field::new_list(
1112 "name",
1113 Field::new("name", DataType::Int32, false),
1114 false,
1115 ));
1116 }
1117
1118 let parquet_group_type = parse_message_type(message_type).unwrap();
1119
1120 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1121 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1122 let converted_fields = converted_arrow_schema.fields();
1123
1124 assert_eq!(arrow_fields.len(), converted_fields.len());
1125 for i in 0..arrow_fields.len() {
1126 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1127 }
1128 }
1129
1130 #[test]
1131 fn test_parquet_list_nullable() {
1132 let mut arrow_fields = Vec::new();
1133
1134 let message_type = "
1135 message test_schema {
1136 REQUIRED GROUP my_list1 (LIST) {
1137 REPEATED GROUP list {
1138 OPTIONAL BINARY element (UTF8);
1139 }
1140 }
1141 OPTIONAL GROUP my_list2 (LIST) {
1142 REPEATED GROUP list {
1143 REQUIRED BINARY element (UTF8);
1144 }
1145 }
1146 REQUIRED GROUP my_list3 (LIST) {
1147 REPEATED GROUP list {
1148 REQUIRED BINARY element (UTF8);
1149 }
1150 }
1151 }
1152 ";
1153
1154 {
1161 arrow_fields.push(Field::new_list(
1162 "my_list1",
1163 Field::new("element", DataType::Utf8, true),
1164 false,
1165 ));
1166 }
1167
1168 {
1175 arrow_fields.push(Field::new_list(
1176 "my_list2",
1177 Field::new("element", DataType::Utf8, false),
1178 true,
1179 ));
1180 }
1181
1182 {
1189 arrow_fields.push(Field::new_list(
1190 "my_list3",
1191 Field::new("element", DataType::Utf8, false),
1192 false,
1193 ));
1194 }
1195
1196 let parquet_group_type = parse_message_type(message_type).unwrap();
1197
1198 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1199 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1200 let converted_fields = converted_arrow_schema.fields();
1201
1202 assert_eq!(arrow_fields.len(), converted_fields.len());
1203 for i in 0..arrow_fields.len() {
1204 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1205 }
1206 }
1207
1208 #[test]
1209 fn test_parquet_maps() {
1210 let mut arrow_fields = Vec::new();
1211
1212 let message_type = "
1214 message test_schema {
1215 REQUIRED group my_map1 (MAP) {
1216 REPEATED group key_value {
1217 REQUIRED binary key (UTF8);
1218 OPTIONAL int32 value;
1219 }
1220 }
1221 OPTIONAL group my_map2 (MAP) {
1222 REPEATED group map {
1223 REQUIRED binary str (UTF8);
1224 REQUIRED int32 num;
1225 }
1226 }
1227 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1228 REPEATED group map {
1229 REQUIRED binary key (UTF8);
1230 OPTIONAL int32 value;
1231 }
1232 }
1233 REQUIRED group my_map4 (MAP) {
1234 REPEATED group map {
1235 OPTIONAL binary key (UTF8);
1236 REQUIRED int32 value;
1237 }
1238 }
1239 }
1240 ";
1241
1242 {
1250 arrow_fields.push(Field::new_map(
1251 "my_map1",
1252 "key_value",
1253 Field::new("key", DataType::Utf8, false),
1254 Field::new("value", DataType::Int32, true),
1255 false,
1256 false,
1257 ));
1258 }
1259
1260 {
1268 arrow_fields.push(Field::new_map(
1269 "my_map2",
1270 "map",
1271 Field::new("str", DataType::Utf8, false),
1272 Field::new("num", DataType::Int32, false),
1273 false,
1274 true,
1275 ));
1276 }
1277
1278 {
1286 arrow_fields.push(Field::new_map(
1287 "my_map3",
1288 "map",
1289 Field::new("key", DataType::Utf8, false),
1290 Field::new("value", DataType::Int32, true),
1291 false,
1292 true,
1293 ));
1294 }
1295
1296 {
1304 arrow_fields.push(Field::new_map(
1305 "my_map4",
1306 "map",
1307 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1309 false,
1310 false,
1311 ));
1312 }
1313
1314 let parquet_group_type = parse_message_type(message_type).unwrap();
1315
1316 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1317 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1318 let converted_fields = converted_arrow_schema.fields();
1319
1320 assert_eq!(arrow_fields.len(), converted_fields.len());
1321 for i in 0..arrow_fields.len() {
1322 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1323 }
1324 }
1325
1326 #[test]
1327 fn test_nested_schema() {
1328 let mut arrow_fields = Vec::new();
1329 {
1330 let group1_fields = Fields::from(vec![
1331 Field::new("leaf1", DataType::Boolean, false),
1332 Field::new("leaf2", DataType::Int32, false),
1333 ]);
1334 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1335 arrow_fields.push(group1_struct);
1336
1337 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1338 arrow_fields.push(leaf3_field);
1339 }
1340
1341 let message_type = "
1342 message test_schema {
1343 REQUIRED GROUP group1 {
1344 REQUIRED BOOLEAN leaf1;
1345 REQUIRED INT32 leaf2;
1346 }
1347 REQUIRED INT64 leaf3;
1348 }
1349 ";
1350 let parquet_group_type = parse_message_type(message_type).unwrap();
1351
1352 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1353 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1354 let converted_fields = converted_arrow_schema.fields();
1355
1356 assert_eq!(arrow_fields.len(), converted_fields.len());
1357 for i in 0..arrow_fields.len() {
1358 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1359 }
1360 }
1361
1362 #[test]
1363 fn test_nested_schema_partial() {
1364 let mut arrow_fields = Vec::new();
1365 {
1366 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1367 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1368 arrow_fields.push(group1);
1369
1370 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1371 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1372 arrow_fields.push(group2);
1373
1374 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1375 }
1376
1377 let message_type = "
1378 message test_schema {
1379 REQUIRED GROUP group1 {
1380 REQUIRED INT64 leaf1;
1381 REQUIRED INT64 leaf2;
1382 }
1383 REQUIRED GROUP group2 {
1384 REQUIRED INT64 leaf3;
1385 REQUIRED INT64 leaf4;
1386 }
1387 REQUIRED INT64 leaf5;
1388 }
1389 ";
1390 let parquet_group_type = parse_message_type(message_type).unwrap();
1391
1392 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1402 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1403 let converted_arrow_schema =
1404 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1405 let converted_fields = converted_arrow_schema.fields();
1406
1407 assert_eq!(arrow_fields.len(), converted_fields.len());
1408 for i in 0..arrow_fields.len() {
1409 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1410 }
1411 }
1412
1413 #[test]
1414 fn test_nested_schema_partial_ordering() {
1415 let mut arrow_fields = Vec::new();
1416 {
1417 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1418 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1419 arrow_fields.push(group1);
1420
1421 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1422 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1423 arrow_fields.push(group2);
1424
1425 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1426 }
1427
1428 let message_type = "
1429 message test_schema {
1430 REQUIRED GROUP group1 {
1431 REQUIRED INT64 leaf1;
1432 REQUIRED INT64 leaf2;
1433 }
1434 REQUIRED GROUP group2 {
1435 REQUIRED INT64 leaf3;
1436 REQUIRED INT64 leaf4;
1437 }
1438 REQUIRED INT64 leaf5;
1439 }
1440 ";
1441 let parquet_group_type = parse_message_type(message_type).unwrap();
1442
1443 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1453 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1454 let converted_arrow_schema =
1455 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1456 let converted_fields = converted_arrow_schema.fields();
1457
1458 assert_eq!(arrow_fields.len(), converted_fields.len());
1459 for i in 0..arrow_fields.len() {
1460 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1461 }
1462
1463 let mask =
1464 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1465 let converted_arrow_schema =
1466 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1467 let converted_fields = converted_arrow_schema.fields();
1468
1469 assert_eq!(arrow_fields.len(), converted_fields.len());
1470 for i in 0..arrow_fields.len() {
1471 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1472 }
1473 }
1474
1475 #[test]
1476 fn test_repeated_nested_schema() {
1477 let mut arrow_fields = Vec::new();
1478 {
1479 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1480
1481 let inner_group_list = Field::new_list(
1482 "innerGroup",
1483 Field::new_struct(
1484 "innerGroup",
1485 vec![Field::new("leaf3", DataType::Int32, true)],
1486 false,
1487 ),
1488 false,
1489 );
1490
1491 let outer_group_list = Field::new_list(
1492 "outerGroup",
1493 Field::new_struct(
1494 "outerGroup",
1495 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1496 false,
1497 ),
1498 false,
1499 );
1500 arrow_fields.push(outer_group_list);
1501 }
1502
1503 let message_type = "
1504 message test_schema {
1505 OPTIONAL INT32 leaf1;
1506 REPEATED GROUP outerGroup {
1507 OPTIONAL INT32 leaf2;
1508 REPEATED GROUP innerGroup {
1509 OPTIONAL INT32 leaf3;
1510 }
1511 }
1512 }
1513 ";
1514 let parquet_group_type = parse_message_type(message_type).unwrap();
1515
1516 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1517 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1518 let converted_fields = converted_arrow_schema.fields();
1519
1520 assert_eq!(arrow_fields.len(), converted_fields.len());
1521 for i in 0..arrow_fields.len() {
1522 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1523 }
1524 }
1525
1526 #[test]
1527 fn test_column_desc_to_field() {
1528 let message_type = "
1529 message test_schema {
1530 REQUIRED BOOLEAN boolean;
1531 REQUIRED INT32 int8 (INT_8);
1532 REQUIRED INT32 uint8 (INTEGER(8,false));
1533 REQUIRED INT32 int16 (INT_16);
1534 REQUIRED INT32 uint16 (INTEGER(16,false));
1535 REQUIRED INT32 int32;
1536 REQUIRED INT64 int64;
1537 OPTIONAL DOUBLE double;
1538 OPTIONAL FLOAT float;
1539 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1540 OPTIONAL BINARY string (UTF8);
1541 REPEATED BOOLEAN bools;
1542 OPTIONAL INT32 date (DATE);
1543 OPTIONAL INT32 time_milli (TIME_MILLIS);
1544 OPTIONAL INT64 time_micro (TIME_MICROS);
1545 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1546 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1547 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1548 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1549 REPEATED INT32 int_list;
1550 REPEATED BINARY byte_list;
1551 REPEATED BINARY string_list (UTF8);
1552 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1553 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1554 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1555 }
1556 ";
1557 let parquet_group_type = parse_message_type(message_type).unwrap();
1558
1559 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1560 let converted_arrow_fields = parquet_schema
1561 .columns()
1562 .iter()
1563 .map(|c| parquet_to_arrow_field(c).unwrap())
1564 .collect::<Vec<Field>>();
1565
1566 let arrow_fields = vec![
1567 Field::new("boolean", DataType::Boolean, false),
1568 Field::new("int8", DataType::Int8, false),
1569 Field::new("uint8", DataType::UInt8, false),
1570 Field::new("int16", DataType::Int16, false),
1571 Field::new("uint16", DataType::UInt16, false),
1572 Field::new("int32", DataType::Int32, false),
1573 Field::new("int64", DataType::Int64, false),
1574 Field::new("double", DataType::Float64, true),
1575 Field::new("float", DataType::Float32, true),
1576 Field::new("float16", DataType::Float16, true),
1577 Field::new("string", DataType::Utf8, true),
1578 Field::new_list(
1579 "bools",
1580 Field::new("bools", DataType::Boolean, false),
1581 false,
1582 ),
1583 Field::new("date", DataType::Date32, true),
1584 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1585 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1586 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1587 Field::new(
1588 "ts_milli",
1589 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1590 true,
1591 ),
1592 Field::new(
1593 "ts_micro",
1594 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1595 false,
1596 ),
1597 Field::new(
1598 "ts_nano",
1599 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1600 false,
1601 ),
1602 Field::new_list(
1603 "int_list",
1604 Field::new("int_list", DataType::Int32, false),
1605 false,
1606 ),
1607 Field::new_list(
1608 "byte_list",
1609 Field::new("byte_list", DataType::Binary, false),
1610 false,
1611 ),
1612 Field::new_list(
1613 "string_list",
1614 Field::new("string_list", DataType::Utf8, false),
1615 false,
1616 ),
1617 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1618 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1619 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1620 ];
1621
1622 assert_eq!(arrow_fields, converted_arrow_fields);
1623 }
1624
1625 #[test]
1626 fn test_coerced_map_list() {
1627 let arrow_fields = vec![
1629 Field::new_list(
1630 "my_list",
1631 Field::new("item", DataType::Boolean, true),
1632 false,
1633 ),
1634 Field::new_map(
1635 "my_map",
1636 "entries",
1637 Field::new("keys", DataType::Utf8, false),
1638 Field::new("values", DataType::Int32, true),
1639 false,
1640 true,
1641 ),
1642 ];
1643 let arrow_schema = Schema::new(arrow_fields);
1644
1645 let message_type = "
1647 message parquet_schema {
1648 REQUIRED GROUP my_list (LIST) {
1649 REPEATED GROUP list {
1650 OPTIONAL BOOLEAN element;
1651 }
1652 }
1653 OPTIONAL GROUP my_map (MAP) {
1654 REPEATED GROUP key_value {
1655 REQUIRED BINARY key (STRING);
1656 OPTIONAL INT32 value;
1657 }
1658 }
1659 }
1660 ";
1661 let parquet_group_type = parse_message_type(message_type).unwrap();
1662 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1663 let converted_arrow_schema = ArrowSchemaConverter::new()
1664 .with_coerce_types(true)
1665 .convert(&arrow_schema)
1666 .unwrap();
1667 assert_eq!(
1668 parquet_schema.columns().len(),
1669 converted_arrow_schema.columns().len()
1670 );
1671
1672 let message_type = "
1674 message parquet_schema {
1675 REQUIRED GROUP my_list (LIST) {
1676 REPEATED GROUP list {
1677 OPTIONAL BOOLEAN item;
1678 }
1679 }
1680 OPTIONAL GROUP my_map (MAP) {
1681 REPEATED GROUP entries {
1682 REQUIRED BINARY keys (STRING);
1683 OPTIONAL INT32 values;
1684 }
1685 }
1686 }
1687 ";
1688 let parquet_group_type = parse_message_type(message_type).unwrap();
1689 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1690 let converted_arrow_schema = ArrowSchemaConverter::new()
1691 .with_coerce_types(false)
1692 .convert(&arrow_schema)
1693 .unwrap();
1694 assert_eq!(
1695 parquet_schema.columns().len(),
1696 converted_arrow_schema.columns().len()
1697 );
1698 }
1699
1700 #[test]
1701 fn test_field_to_column_desc() {
1702 let message_type = "
1703 message arrow_schema {
1704 REQUIRED BOOLEAN boolean;
1705 REQUIRED INT32 int8 (INT_8);
1706 REQUIRED INT32 int16 (INTEGER(16,true));
1707 REQUIRED INT32 int32;
1708 REQUIRED INT64 int64;
1709 OPTIONAL DOUBLE double;
1710 OPTIONAL FLOAT float;
1711 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1712 OPTIONAL BINARY string (STRING);
1713 OPTIONAL GROUP bools (LIST) {
1714 REPEATED GROUP list {
1715 OPTIONAL BOOLEAN element;
1716 }
1717 }
1718 REQUIRED GROUP bools_non_null (LIST) {
1719 REPEATED GROUP list {
1720 REQUIRED BOOLEAN element;
1721 }
1722 }
1723 OPTIONAL INT32 date (DATE);
1724 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1725 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1726 OPTIONAL INT64 time_micro (TIME_MICROS);
1727 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1728 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1729 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1730 REQUIRED INT64 ts_seconds;
1731 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1732 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1733 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1734 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1735 REQUIRED GROUP struct {
1736 REQUIRED BOOLEAN bools;
1737 REQUIRED INT32 uint32 (INTEGER(32,false));
1738 REQUIRED GROUP int32 (LIST) {
1739 REPEATED GROUP list {
1740 OPTIONAL INT32 element;
1741 }
1742 }
1743 }
1744 REQUIRED BINARY dictionary_strings (STRING);
1745 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1746 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1747 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1748 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1749 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1750 }
1751 ";
1752 let parquet_group_type = parse_message_type(message_type).unwrap();
1753
1754 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1755
1756 let arrow_fields = vec![
1757 Field::new("boolean", DataType::Boolean, false),
1758 Field::new("int8", DataType::Int8, false),
1759 Field::new("int16", DataType::Int16, false),
1760 Field::new("int32", DataType::Int32, false),
1761 Field::new("int64", DataType::Int64, false),
1762 Field::new("double", DataType::Float64, true),
1763 Field::new("float", DataType::Float32, true),
1764 Field::new("float16", DataType::Float16, true),
1765 Field::new("string", DataType::Utf8, true),
1766 Field::new_list(
1767 "bools",
1768 Field::new("element", DataType::Boolean, true),
1769 true,
1770 ),
1771 Field::new_list(
1772 "bools_non_null",
1773 Field::new("element", DataType::Boolean, false),
1774 false,
1775 ),
1776 Field::new("date", DataType::Date32, true),
1777 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1778 Field::new(
1779 "time_milli_utc",
1780 DataType::Time32(TimeUnit::Millisecond),
1781 true,
1782 )
1783 .with_metadata(HashMap::from_iter(vec![(
1784 "adjusted_to_utc".to_string(),
1785 "".to_string(),
1786 )])),
1787 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1788 Field::new(
1789 "time_micro_utc",
1790 DataType::Time64(TimeUnit::Microsecond),
1791 true,
1792 )
1793 .with_metadata(HashMap::from_iter(vec![(
1794 "adjusted_to_utc".to_string(),
1795 "".to_string(),
1796 )])),
1797 Field::new(
1798 "ts_milli",
1799 DataType::Timestamp(TimeUnit::Millisecond, None),
1800 true,
1801 ),
1802 Field::new(
1803 "ts_micro",
1804 DataType::Timestamp(TimeUnit::Microsecond, None),
1805 false,
1806 ),
1807 Field::new(
1808 "ts_seconds",
1809 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1810 false,
1811 ),
1812 Field::new(
1813 "ts_micro_utc",
1814 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1815 false,
1816 ),
1817 Field::new(
1818 "ts_millis_zero_offset",
1819 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1820 false,
1821 ),
1822 Field::new(
1823 "ts_millis_zero_negative_offset",
1824 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1825 false,
1826 ),
1827 Field::new(
1828 "ts_micro_non_utc",
1829 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1830 false,
1831 ),
1832 Field::new_struct(
1833 "struct",
1834 vec![
1835 Field::new("bools", DataType::Boolean, false),
1836 Field::new("uint32", DataType::UInt32, false),
1837 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1838 ],
1839 false,
1840 ),
1841 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1842 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1843 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1844 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1845 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1846 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1847 ];
1848 let arrow_schema = Schema::new(arrow_fields);
1849 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1850
1851 assert_eq!(
1852 parquet_schema.columns().len(),
1853 converted_arrow_schema.columns().len()
1854 );
1855 parquet_schema
1856 .columns()
1857 .iter()
1858 .zip(converted_arrow_schema.columns())
1859 .for_each(|(a, b)| {
1860 match a.logical_type() {
1865 Some(_) => {
1866 assert_eq!(a, b)
1867 }
1868 None => {
1869 assert_eq!(a.name(), b.name());
1870 assert_eq!(a.physical_type(), b.physical_type());
1871 assert_eq!(a.converted_type(), b.converted_type());
1872 }
1873 };
1874 });
1875 }
1876
1877 #[test]
1878 #[should_panic(expected = "Parquet does not support writing empty structs")]
1879 fn test_empty_struct_field() {
1880 let arrow_fields = vec![Field::new(
1881 "struct",
1882 DataType::Struct(Fields::empty()),
1883 false,
1884 )];
1885 let arrow_schema = Schema::new(arrow_fields);
1886 let converted_arrow_schema = ArrowSchemaConverter::new()
1887 .with_coerce_types(true)
1888 .convert(&arrow_schema);
1889
1890 converted_arrow_schema.unwrap();
1891 }
1892
1893 #[test]
1894 fn test_metadata() {
1895 let message_type = "
1896 message test_schema {
1897 OPTIONAL BINARY string (STRING);
1898 }
1899 ";
1900 let parquet_group_type = parse_message_type(message_type).unwrap();
1901
1902 let key_value_metadata = vec![
1903 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1904 KeyValue::new("baz".to_owned(), None),
1905 ];
1906
1907 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1908 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1909
1910 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1911 let converted_arrow_schema =
1912 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1913
1914 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1915 }
1916
1917 #[test]
1918 fn test_arrow_schema_roundtrip() -> Result<()> {
1919 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1920 a.iter()
1921 .map(|(a, b)| (a.to_string(), b.to_string()))
1922 .collect()
1923 };
1924
1925 let schema = Schema::new_with_metadata(
1926 vec![
1927 Field::new("c1", DataType::Utf8, false)
1928 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1929 Field::new("c2", DataType::Binary, false),
1930 Field::new("c3", DataType::FixedSizeBinary(3), false),
1931 Field::new("c4", DataType::Boolean, false),
1932 Field::new("c5", DataType::Date32, false),
1933 Field::new("c6", DataType::Date64, false),
1934 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1935 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1936 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1937 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1938 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1939 Field::new(
1940 "c16",
1941 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1942 false,
1943 ),
1944 Field::new(
1945 "c17",
1946 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1947 false,
1948 ),
1949 Field::new(
1950 "c18",
1951 DataType::Timestamp(TimeUnit::Nanosecond, None),
1952 false,
1953 ),
1954 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1955 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1956 Field::new_list(
1957 "c21",
1958 Field::new_list_field(DataType::Boolean, true)
1959 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1960 false,
1961 )
1962 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1963 Field::new(
1964 "c22",
1965 DataType::FixedSizeList(
1966 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1967 5,
1968 ),
1969 false,
1970 ),
1971 Field::new_list(
1972 "c23",
1973 Field::new_large_list(
1974 "inner",
1975 Field::new_list_field(
1976 DataType::Struct(
1977 vec![
1978 Field::new("a", DataType::Int16, true),
1979 Field::new("b", DataType::Float64, false),
1980 Field::new("c", DataType::Float32, false),
1981 Field::new("d", DataType::Float16, false),
1982 ]
1983 .into(),
1984 ),
1985 false,
1986 ),
1987 true,
1988 ),
1989 false,
1990 ),
1991 Field::new(
1992 "c24",
1993 DataType::Struct(Fields::from(vec![
1994 Field::new("a", DataType::Utf8, false),
1995 Field::new("b", DataType::UInt16, false),
1996 ])),
1997 false,
1998 ),
1999 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2000 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2001 #[allow(deprecated)]
2007 Field::new_dict(
2008 "c31",
2009 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2010 true,
2011 123,
2012 true,
2013 )
2014 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2015 Field::new("c32", DataType::LargeBinary, true),
2016 Field::new("c33", DataType::LargeUtf8, true),
2017 Field::new_large_list(
2018 "c34",
2019 Field::new_list(
2020 "inner",
2021 Field::new_list_field(
2022 DataType::Struct(
2023 vec![
2024 Field::new("a", DataType::Int16, true),
2025 Field::new("b", DataType::Float64, true),
2026 ]
2027 .into(),
2028 ),
2029 true,
2030 ),
2031 true,
2032 ),
2033 true,
2034 ),
2035 Field::new("c35", DataType::Null, true),
2036 Field::new("c36", DataType::Decimal128(2, 1), false),
2037 Field::new("c37", DataType::Decimal256(50, 20), false),
2038 Field::new("c38", DataType::Decimal128(18, 12), true),
2039 Field::new_map(
2040 "c39",
2041 "key_value",
2042 Field::new("key", DataType::Utf8, false),
2043 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2044 false, true,
2046 ),
2047 Field::new_map(
2048 "c40",
2049 "my_entries",
2050 Field::new("my_key", DataType::Utf8, false)
2051 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2052 Field::new_list(
2053 "my_value",
2054 Field::new_list_field(DataType::Utf8, true)
2055 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2056 true,
2057 )
2058 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2059 false, true,
2061 )
2062 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2063 Field::new_map(
2064 "c41",
2065 "my_entries",
2066 Field::new("my_key", DataType::Utf8, false),
2067 Field::new_list(
2068 "my_value",
2069 Field::new_list_field(DataType::Utf8, true)
2070 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2071 true,
2072 ),
2073 false, false,
2075 ),
2076 ],
2077 meta(&[("Key", "Value")]),
2078 );
2079
2080 let file = tempfile::tempfile().unwrap();
2082 let writer =
2083 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2084 writer.close()?;
2085
2086 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2088
2089 let read_schema = arrow_reader.schema();
2091 assert_eq!(&schema, read_schema.as_ref());
2092
2093 let mut stack = Vec::with_capacity(10);
2095 let mut out = Vec::with_capacity(10);
2096
2097 let root = arrow_reader.parquet_schema().root_schema_ptr();
2098 stack.push((root.name().to_string(), root));
2099
2100 while let Some((p, t)) = stack.pop() {
2101 if t.is_group() {
2102 for f in t.get_fields() {
2103 stack.push((format!("{p}.{}", f.name()), f.clone()))
2104 }
2105 }
2106
2107 let info = t.get_basic_info();
2108 if info.has_id() {
2109 out.push(format!("{p} -> {}", info.id()))
2110 }
2111 }
2112 out.sort_unstable();
2113 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2114
2115 assert_eq!(
2116 &out,
2117 &[
2118 "arrow_schema.c1 -> 2",
2119 "arrow_schema.c21 -> 4",
2120 "arrow_schema.c21.list.item -> 5",
2121 "arrow_schema.c31 -> 6",
2122 "arrow_schema.c40 -> 7",
2123 "arrow_schema.c40.my_entries.my_key -> 8",
2124 "arrow_schema.c40.my_entries.my_value -> 9",
2125 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2126 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2127 ]
2128 );
2129
2130 Ok(())
2131 }
2132
2133 #[test]
2134 fn test_read_parquet_field_ids_raw() -> Result<()> {
2135 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2136 a.iter()
2137 .map(|(a, b)| (a.to_string(), b.to_string()))
2138 .collect()
2139 };
2140 let schema = Schema::new_with_metadata(
2141 vec![
2142 Field::new("c1", DataType::Utf8, true)
2143 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2144 Field::new("c2", DataType::Utf8, true)
2145 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2146 ],
2147 HashMap::new(),
2148 );
2149
2150 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2151 let parquet_bytes = writer.into_inner()?;
2152
2153 let reader =
2154 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2155 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2156
2157 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2159
2160 let parq_schema_descr = ArrowSchemaConverter::new()
2161 .with_coerce_types(true)
2162 .convert(&arrow_schema)?;
2163 let parq_fields = parq_schema_descr.root_schema().get_fields();
2164 assert_eq!(parq_fields.len(), 2);
2165 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2166 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2167
2168 Ok(())
2169 }
2170
2171 #[test]
2172 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2173 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2174 .iter()
2175 .cloned()
2176 .collect();
2177
2178 let schema = Schema::new_with_metadata(
2179 vec![
2180 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2181 Field::new(
2182 "c22",
2183 DataType::FixedSizeList(
2184 Arc::new(Field::new("items", DataType::Boolean, false)),
2185 5,
2186 ),
2187 false,
2188 ),
2189 Field::new_list(
2190 "c23",
2191 Field::new_large_list(
2192 "items",
2193 Field::new_struct(
2194 "items",
2195 vec![
2196 Field::new("a", DataType::Int16, true),
2197 Field::new("b", DataType::Float64, false),
2198 ],
2199 true,
2200 ),
2201 true,
2202 ),
2203 true,
2204 ),
2205 ],
2206 metadata,
2207 );
2208
2209 let file = tempfile::tempfile().unwrap();
2211 let writer =
2212 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2213 writer.close()?;
2214
2215 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2217 let read_schema = arrow_reader.schema();
2218 assert_eq!(&schema, read_schema.as_ref());
2219 Ok(())
2220 }
2221
2222 #[test]
2223 fn test_get_arrow_schema_from_metadata() {
2224 assert!(get_arrow_schema_from_metadata("").is_err());
2225 }
2226
2227 #[test]
2228 #[cfg(feature = "arrow_canonical_extension_types")]
2229 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2230 let arrow_schema = Schema::new(vec![Field::new(
2231 "uuid",
2232 DataType::FixedSizeBinary(16),
2233 false,
2234 )
2235 .with_extension_type(Uuid)]);
2236
2237 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239 assert_eq!(
2240 parquet_schema.column(0).logical_type(),
2241 Some(LogicalType::Uuid)
2242 );
2243
2244 Ok(())
2249 }
2250
2251 #[test]
2252 #[cfg(feature = "arrow_canonical_extension_types")]
2253 fn arrow_json_to_parquet_json() -> Result<()> {
2254 let arrow_schema = Schema::new(vec![
2255 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2256 ]);
2257
2258 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2259
2260 assert_eq!(
2261 parquet_schema.column(0).logical_type(),
2262 Some(LogicalType::Json)
2263 );
2264
2265 Ok(())
2274 }
2275}