1use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45pub fn parquet_to_arrow_schema(
50 parquet_schema: &SchemaDescriptor,
51 key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56pub fn parquet_to_arrow_schema_by_columns(
59 parquet_schema: &SchemaDescriptor,
60 mask: ProjectionMask,
61 key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66pub(crate) fn parquet_to_arrow_schema_and_fields(
68 parquet_schema: &SchemaDescriptor,
69 mask: ProjectionMask,
70 key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73 let maybe_schema = metadata
74 .remove(super::ARROW_SCHEMA_META_KEY)
75 .map(|value| get_arrow_schema_from_metadata(&value))
76 .transpose()?;
77
78 if let Some(arrow_schema) = &maybe_schema {
80 arrow_schema.metadata().iter().for_each(|(k, v)| {
81 metadata.entry(k.clone()).or_insert_with(|| v.clone());
82 });
83 }
84
85 let hint = maybe_schema.as_ref().map(|s| s.fields());
86 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88 Ok((schema, field_levels.levels))
89}
90
91#[derive(Debug, Clone)]
99pub struct FieldLevels {
100 pub(crate) fields: Fields,
101 pub(crate) levels: Option<ParquetField>,
102}
103
104pub fn parquet_to_arrow_field_levels(
125 schema: &SchemaDescriptor,
126 mask: ProjectionMask,
127 hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129 match complex::convert_schema(schema, mask, hint)? {
130 Some(field) => match &field.arrow_type {
131 DataType::Struct(fields) => Ok(FieldLevels {
132 fields: fields.clone(),
133 levels: Some(field),
134 }),
135 _ => unreachable!(),
136 },
137 None => Ok(FieldLevels {
138 fields: Fields::empty(),
139 levels: None,
140 }),
141 }
142}
143
144fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146 let decoded = BASE64_STANDARD.decode(encoded_meta);
147 match decoded {
148 Ok(bytes) => {
149 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150 &bytes[8..]
151 } else {
152 bytes.as_slice()
153 };
154 match arrow_ipc::root_as_message(slice) {
155 Ok(message) => message
156 .header_as_schema()
157 .map(arrow_ipc::convert::fb_to_schema)
158 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159 Err(err) => {
160 Err(arrow_err!(
162 "Unable to get root as message stored in {}: {:?}",
163 super::ARROW_SCHEMA_META_KEY,
164 err
165 ))
166 }
167 }
168 }
169 Err(err) => {
170 Err(arrow_err!(
172 "Unable to decode the encoded schema stored in {}, {:?}",
173 super::ARROW_SCHEMA_META_KEY,
174 err
175 ))
176 }
177 }
178}
179
180pub fn encode_arrow_schema(schema: &Schema) -> String {
182 let options = writer::IpcWriteOptions::default();
183 #[allow(deprecated)]
184 let mut dictionary_tracker =
185 writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
186 let data_gen = writer::IpcDataGenerator::default();
187 let mut serialized_schema =
188 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
189
190 let schema_len = serialized_schema.ipc_message.len();
193 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196 len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198 BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
208 let encoded = encode_arrow_schema(schema);
209
210 let schema_kv = KeyValue {
211 key: super::ARROW_SCHEMA_META_KEY.to_string(),
212 value: Some(encoded),
213 };
214
215 let meta = props
216 .key_value_metadata
217 .get_or_insert_with(Default::default);
218
219 let schema_meta = meta
221 .iter()
222 .enumerate()
223 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
224 match schema_meta {
225 Some((i, _)) => {
226 meta.remove(i);
227 meta.push(schema_kv);
228 }
229 None => {
230 meta.push(schema_kv);
231 }
232 }
233}
234
235#[derive(Debug)]
280pub struct ArrowSchemaConverter<'a> {
281 schema_root: &'a str,
283 coerce_types: bool,
287}
288
289impl Default for ArrowSchemaConverter<'_> {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295impl<'a> ArrowSchemaConverter<'a> {
296 pub fn new() -> Self {
298 Self {
299 schema_root: "arrow_schema",
300 coerce_types: false,
301 }
302 }
303
304 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
335 self.coerce_types = coerce_types;
336 self
337 }
338
339 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
341 self.schema_root = schema_root;
342 self
343 }
344
345 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
349 let fields = schema
350 .fields()
351 .iter()
352 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
353 .collect::<Result<_>>()?;
354 let group = Type::group_type_builder(self.schema_root)
355 .with_fields(fields)
356 .build()?;
357 Ok(SchemaDescriptor::new(Arc::new(group)))
358 }
359}
360
361#[deprecated(since = "54.0.0", note = "Use `ArrowSchemaConverter` instead")]
366pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
367 ArrowSchemaConverter::new().convert(schema)
368}
369
370fn parse_key_value_metadata(
371 key_value_metadata: Option<&Vec<KeyValue>>,
372) -> Option<HashMap<String, String>> {
373 match key_value_metadata {
374 Some(key_values) => {
375 let map: HashMap<String, String> = key_values
376 .iter()
377 .filter_map(|kv| {
378 kv.value
379 .as_ref()
380 .map(|value| (kv.key.clone(), value.clone()))
381 })
382 .collect();
383
384 if map.is_empty() {
385 None
386 } else {
387 Some(map)
388 }
389 }
390 None => None,
391 }
392}
393
394pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
396 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
397 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
398
399 let basic_info = parquet_column.self_type().get_basic_info();
400 let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
401 2
402 } else {
403 1
404 });
405 if basic_info.has_id() {
406 meta.insert(
407 PARQUET_FIELD_ID_META_KEY.to_string(),
408 basic_info.id().to_string(),
409 );
410 }
411 #[cfg(feature = "arrow_canonical_extension_types")]
412 if let Some(logical_type) = basic_info.logical_type() {
413 match logical_type {
414 LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
415 LogicalType::Json => ret.try_with_extension_type(Json::default())?,
416 _ => {}
417 }
418 }
419 if !meta.is_empty() {
420 ret.set_metadata(meta);
421 }
422
423 Ok(ret)
424}
425
426pub fn decimal_length_from_precision(precision: u8) -> usize {
427 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
435}
436
437fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
439 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
440 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
441 const PARQUET_KEY_FIELD_NAME: &str = "key";
442 const PARQUET_VALUE_FIELD_NAME: &str = "value";
443
444 let name = field.name().as_str();
445 let repetition = if field.is_nullable() {
446 Repetition::OPTIONAL
447 } else {
448 Repetition::REQUIRED
449 };
450 let id = field_id(field);
451 match field.data_type() {
453 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
454 .with_logical_type(Some(LogicalType::Unknown))
455 .with_repetition(repetition)
456 .with_id(id)
457 .build(),
458 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
459 .with_repetition(repetition)
460 .with_id(id)
461 .build(),
462 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
463 .with_logical_type(Some(LogicalType::Integer {
464 bit_width: 8,
465 is_signed: true,
466 }))
467 .with_repetition(repetition)
468 .with_id(id)
469 .build(),
470 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
471 .with_logical_type(Some(LogicalType::Integer {
472 bit_width: 16,
473 is_signed: true,
474 }))
475 .with_repetition(repetition)
476 .with_id(id)
477 .build(),
478 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
479 .with_repetition(repetition)
480 .with_id(id)
481 .build(),
482 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
483 .with_repetition(repetition)
484 .with_id(id)
485 .build(),
486 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
487 .with_logical_type(Some(LogicalType::Integer {
488 bit_width: 8,
489 is_signed: false,
490 }))
491 .with_repetition(repetition)
492 .with_id(id)
493 .build(),
494 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
495 .with_logical_type(Some(LogicalType::Integer {
496 bit_width: 16,
497 is_signed: false,
498 }))
499 .with_repetition(repetition)
500 .with_id(id)
501 .build(),
502 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
503 .with_logical_type(Some(LogicalType::Integer {
504 bit_width: 32,
505 is_signed: false,
506 }))
507 .with_repetition(repetition)
508 .with_id(id)
509 .build(),
510 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
511 .with_logical_type(Some(LogicalType::Integer {
512 bit_width: 64,
513 is_signed: false,
514 }))
515 .with_repetition(repetition)
516 .with_id(id)
517 .build(),
518 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
519 .with_repetition(repetition)
520 .with_id(id)
521 .with_logical_type(Some(LogicalType::Float16))
522 .with_length(2)
523 .build(),
524 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
525 .with_repetition(repetition)
526 .with_id(id)
527 .build(),
528 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
529 .with_repetition(repetition)
530 .with_id(id)
531 .build(),
532 DataType::Timestamp(TimeUnit::Second, _) => {
533 Type::primitive_type_builder(name, PhysicalType::INT64)
535 .with_repetition(repetition)
536 .with_id(id)
537 .build()
538 }
539 DataType::Timestamp(time_unit, tz) => {
540 Type::primitive_type_builder(name, PhysicalType::INT64)
541 .with_logical_type(Some(LogicalType::Timestamp {
542 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
544 unit: match time_unit {
545 TimeUnit::Second => unreachable!(),
546 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
547 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
548 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
549 },
550 }))
551 .with_repetition(repetition)
552 .with_id(id)
553 .build()
554 }
555 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
556 .with_logical_type(Some(LogicalType::Date))
557 .with_repetition(repetition)
558 .with_id(id)
559 .build(),
560 DataType::Date64 => {
561 if coerce_types {
562 Type::primitive_type_builder(name, PhysicalType::INT32)
563 .with_logical_type(Some(LogicalType::Date))
564 .with_repetition(repetition)
565 .with_id(id)
566 .build()
567 } else {
568 Type::primitive_type_builder(name, PhysicalType::INT64)
569 .with_repetition(repetition)
570 .with_id(id)
571 .build()
572 }
573 }
574 DataType::Time32(TimeUnit::Second) => {
575 Type::primitive_type_builder(name, PhysicalType::INT32)
577 .with_repetition(repetition)
578 .with_id(id)
579 .build()
580 }
581 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
582 .with_logical_type(Some(LogicalType::Time {
583 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
584 unit: match unit {
585 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
586 u => unreachable!("Invalid unit for Time32: {:?}", u),
587 },
588 }))
589 .with_repetition(repetition)
590 .with_id(id)
591 .build(),
592 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
593 .with_logical_type(Some(LogicalType::Time {
594 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
595 unit: match unit {
596 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
597 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
598 u => unreachable!("Invalid unit for Time64: {:?}", u),
599 },
600 }))
601 .with_repetition(repetition)
602 .with_id(id)
603 .build(),
604 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
605 .with_repetition(repetition)
606 .with_id(id)
607 .build(),
608 DataType::Interval(_) => {
609 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
610 .with_converted_type(ConvertedType::INTERVAL)
611 .with_repetition(repetition)
612 .with_id(id)
613 .with_length(12)
614 .build()
615 }
616 DataType::Binary | DataType::LargeBinary => {
617 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
618 .with_repetition(repetition)
619 .with_id(id)
620 .build()
621 }
622 DataType::FixedSizeBinary(length) => {
623 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
624 .with_repetition(repetition)
625 .with_id(id)
626 .with_length(*length)
627 .with_logical_type(
628 #[cfg(feature = "arrow_canonical_extension_types")]
629 field
631 .try_extension_type::<Uuid>()
632 .ok()
633 .map(|_| LogicalType::Uuid),
634 #[cfg(not(feature = "arrow_canonical_extension_types"))]
635 None,
636 )
637 .build()
638 }
639 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
640 .with_repetition(repetition)
641 .with_id(id)
642 .build(),
643 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
644 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
647 (PhysicalType::INT32, -1)
648 } else if *precision <= 18 {
649 (PhysicalType::INT64, -1)
650 } else {
651 (
652 PhysicalType::FIXED_LEN_BYTE_ARRAY,
653 decimal_length_from_precision(*precision) as i32,
654 )
655 };
656 Type::primitive_type_builder(name, physical_type)
657 .with_repetition(repetition)
658 .with_id(id)
659 .with_length(length)
660 .with_logical_type(Some(LogicalType::Decimal {
661 scale: *scale as i32,
662 precision: *precision as i32,
663 }))
664 .with_precision(*precision as i32)
665 .with_scale(*scale as i32)
666 .build()
667 }
668 DataType::Utf8 | DataType::LargeUtf8 => {
669 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
670 .with_logical_type({
671 #[cfg(feature = "arrow_canonical_extension_types")]
672 {
673 field
676 .try_extension_type::<Json>()
677 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
678 }
679 #[cfg(not(feature = "arrow_canonical_extension_types"))]
680 Some(LogicalType::String)
681 })
682 .with_repetition(repetition)
683 .with_id(id)
684 .build()
685 }
686 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
687 .with_logical_type({
688 #[cfg(feature = "arrow_canonical_extension_types")]
689 {
690 field
693 .try_extension_type::<Json>()
694 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
695 }
696 #[cfg(not(feature = "arrow_canonical_extension_types"))]
697 Some(LogicalType::String)
698 })
699 .with_repetition(repetition)
700 .with_id(id)
701 .build(),
702 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
703 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
704 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
706 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
707 } else {
708 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
709 };
710
711 Type::group_type_builder(name)
712 .with_fields(vec![Arc::new(
713 Type::group_type_builder("list")
714 .with_fields(vec![field_ref])
715 .with_repetition(Repetition::REPEATED)
716 .build()?,
717 )])
718 .with_logical_type(Some(LogicalType::List))
719 .with_repetition(repetition)
720 .with_id(id)
721 .build()
722 }
723 DataType::ListView(_) | DataType::LargeListView(_) => {
724 unimplemented!("ListView/LargeListView not implemented")
725 }
726 DataType::Struct(fields) => {
727 if fields.is_empty() {
728 return Err(arrow_err!("Parquet does not support writing empty structs",));
729 }
730 let fields = fields
732 .iter()
733 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
734 .collect::<Result<_>>()?;
735 Type::group_type_builder(name)
736 .with_fields(fields)
737 .with_repetition(repetition)
738 .with_id(id)
739 .build()
740 }
741 DataType::Map(field, _) => {
742 if let DataType::Struct(struct_fields) = field.data_type() {
743 let map_struct_name = if coerce_types {
745 PARQUET_MAP_STRUCT_NAME
746 } else {
747 field.name()
748 };
749
750 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
752 if coerce_types && fld.name() != name {
753 let f = fld.as_ref().clone().with_name(name);
754 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
755 } else {
756 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
757 }
758 };
759 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
760 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
761
762 Type::group_type_builder(name)
763 .with_fields(vec![Arc::new(
764 Type::group_type_builder(map_struct_name)
765 .with_fields(vec![key_field, val_field])
766 .with_repetition(Repetition::REPEATED)
767 .build()?,
768 )])
769 .with_logical_type(Some(LogicalType::Map))
770 .with_repetition(repetition)
771 .with_id(id)
772 .build()
773 } else {
774 Err(arrow_err!(
775 "DataType::Map should contain a struct field child",
776 ))
777 }
778 }
779 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
780 DataType::Dictionary(_, ref value) => {
781 let dict_field = field.clone().with_data_type(value.as_ref().clone());
783 arrow_to_parquet_type(&dict_field, coerce_types)
784 }
785 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
786 "Converting RunEndEncodedType to parquet not supported",
787 )),
788 }
789}
790
791fn field_id(field: &Field) -> Option<i32> {
792 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
793 value.parse().ok() }
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799
800 use std::{collections::HashMap, sync::Arc};
801
802 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
803
804 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
805 use crate::file::metadata::KeyValue;
806 use crate::file::reader::FileReader;
807 use crate::{
808 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
809 schema::{parser::parse_message_type, types::SchemaDescriptor},
810 };
811
812 #[test]
813 fn test_flat_primitives() {
814 let message_type = "
815 message test_schema {
816 REQUIRED BOOLEAN boolean;
817 REQUIRED INT32 int8 (INT_8);
818 REQUIRED INT32 int16 (INT_16);
819 REQUIRED INT32 uint8 (INTEGER(8,false));
820 REQUIRED INT32 uint16 (INTEGER(16,false));
821 REQUIRED INT32 int32;
822 REQUIRED INT64 int64;
823 OPTIONAL DOUBLE double;
824 OPTIONAL FLOAT float;
825 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
826 OPTIONAL BINARY string (UTF8);
827 OPTIONAL BINARY string_2 (STRING);
828 OPTIONAL BINARY json (JSON);
829 }
830 ";
831 let parquet_group_type = parse_message_type(message_type).unwrap();
832
833 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
834 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
835
836 let arrow_fields = Fields::from(vec![
837 Field::new("boolean", DataType::Boolean, false),
838 Field::new("int8", DataType::Int8, false),
839 Field::new("int16", DataType::Int16, false),
840 Field::new("uint8", DataType::UInt8, false),
841 Field::new("uint16", DataType::UInt16, false),
842 Field::new("int32", DataType::Int32, false),
843 Field::new("int64", DataType::Int64, false),
844 Field::new("double", DataType::Float64, true),
845 Field::new("float", DataType::Float32, true),
846 Field::new("float16", DataType::Float16, true),
847 Field::new("string", DataType::Utf8, true),
848 Field::new("string_2", DataType::Utf8, true),
849 Field::new("json", DataType::Utf8, true),
850 ]);
851
852 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
853 }
854
855 #[test]
856 fn test_decimal_fields() {
857 let message_type = "
858 message test_schema {
859 REQUIRED INT32 decimal1 (DECIMAL(4,2));
860 REQUIRED INT64 decimal2 (DECIMAL(12,2));
861 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
862 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
863 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
864 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
865 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
866 }
867 ";
868
869 let parquet_group_type = parse_message_type(message_type).unwrap();
870
871 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
872 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
873
874 let arrow_fields = Fields::from(vec![
875 Field::new("decimal1", DataType::Decimal128(4, 2), false),
876 Field::new("decimal2", DataType::Decimal128(12, 2), false),
877 Field::new("decimal3", DataType::Decimal128(30, 2), false),
878 Field::new("decimal4", DataType::Decimal128(33, 2), false),
879 Field::new("decimal5", DataType::Decimal128(38, 2), false),
880 Field::new("decimal6", DataType::Decimal256(39, 2), false),
881 Field::new("decimal7", DataType::Decimal256(39, 2), false),
882 ]);
883 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
884 }
885
886 #[test]
887 fn test_byte_array_fields() {
888 let message_type = "
889 message test_schema {
890 REQUIRED BYTE_ARRAY binary;
891 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
892 }
893 ";
894
895 let parquet_group_type = parse_message_type(message_type).unwrap();
896
897 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
898 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
899
900 let arrow_fields = Fields::from(vec![
901 Field::new("binary", DataType::Binary, false),
902 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
903 ]);
904 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
905 }
906
907 #[test]
908 fn test_duplicate_fields() {
909 let message_type = "
910 message test_schema {
911 REQUIRED BOOLEAN boolean;
912 REQUIRED INT32 int8 (INT_8);
913 }
914 ";
915
916 let parquet_group_type = parse_message_type(message_type).unwrap();
917
918 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
919 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
920
921 let arrow_fields = Fields::from(vec![
922 Field::new("boolean", DataType::Boolean, false),
923 Field::new("int8", DataType::Int8, false),
924 ]);
925 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
926
927 let converted_arrow_schema =
928 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
929 .unwrap();
930 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
931 }
932
933 #[test]
934 fn test_parquet_lists() {
935 let mut arrow_fields = Vec::new();
936
937 let message_type = "
939 message test_schema {
940 REQUIRED GROUP my_list (LIST) {
941 REPEATED GROUP list {
942 OPTIONAL BINARY element (UTF8);
943 }
944 }
945 OPTIONAL GROUP my_list (LIST) {
946 REPEATED GROUP list {
947 REQUIRED BINARY element (UTF8);
948 }
949 }
950 OPTIONAL GROUP array_of_arrays (LIST) {
951 REPEATED GROUP list {
952 REQUIRED GROUP element (LIST) {
953 REPEATED GROUP list {
954 REQUIRED INT32 element;
955 }
956 }
957 }
958 }
959 OPTIONAL GROUP my_list (LIST) {
960 REPEATED GROUP element {
961 REQUIRED BINARY str (UTF8);
962 }
963 }
964 OPTIONAL GROUP my_list (LIST) {
965 REPEATED INT32 element;
966 }
967 OPTIONAL GROUP my_list (LIST) {
968 REPEATED GROUP element {
969 REQUIRED BINARY str (UTF8);
970 REQUIRED INT32 num;
971 }
972 }
973 OPTIONAL GROUP my_list (LIST) {
974 REPEATED GROUP array {
975 REQUIRED BINARY str (UTF8);
976 }
977
978 }
979 OPTIONAL GROUP my_list (LIST) {
980 REPEATED GROUP my_list_tuple {
981 REQUIRED BINARY str (UTF8);
982 }
983 }
984 REPEATED INT32 name;
985 }
986 ";
987
988 {
995 arrow_fields.push(Field::new_list(
996 "my_list",
997 Field::new("element", DataType::Utf8, true),
998 false,
999 ));
1000 }
1001
1002 {
1009 arrow_fields.push(Field::new_list(
1010 "my_list",
1011 Field::new("element", DataType::Utf8, false),
1012 true,
1013 ));
1014 }
1015
1016 {
1029 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1030 arrow_fields.push(Field::new_list(
1031 "array_of_arrays",
1032 Field::new_list("element", arrow_inner_list, false),
1033 true,
1034 ));
1035 }
1036
1037 {
1044 arrow_fields.push(Field::new_list(
1045 "my_list",
1046 Field::new("str", DataType::Utf8, false),
1047 true,
1048 ));
1049 }
1050
1051 {
1056 arrow_fields.push(Field::new_list(
1057 "my_list",
1058 Field::new("element", DataType::Int32, false),
1059 true,
1060 ));
1061 }
1062
1063 {
1071 let fields = vec![
1072 Field::new("str", DataType::Utf8, false),
1073 Field::new("num", DataType::Int32, false),
1074 ];
1075 arrow_fields.push(Field::new_list(
1076 "my_list",
1077 Field::new_struct("element", fields, false),
1078 true,
1079 ));
1080 }
1081
1082 {
1090 let fields = vec![Field::new("str", DataType::Utf8, false)];
1091 arrow_fields.push(Field::new_list(
1092 "my_list",
1093 Field::new_struct("array", fields, false),
1094 true,
1095 ));
1096 }
1097
1098 {
1106 let fields = vec![Field::new("str", DataType::Utf8, false)];
1107 arrow_fields.push(Field::new_list(
1108 "my_list",
1109 Field::new_struct("my_list_tuple", fields, false),
1110 true,
1111 ));
1112 }
1113
1114 {
1117 arrow_fields.push(Field::new_list(
1118 "name",
1119 Field::new("name", DataType::Int32, false),
1120 false,
1121 ));
1122 }
1123
1124 let parquet_group_type = parse_message_type(message_type).unwrap();
1125
1126 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1127 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1128 let converted_fields = converted_arrow_schema.fields();
1129
1130 assert_eq!(arrow_fields.len(), converted_fields.len());
1131 for i in 0..arrow_fields.len() {
1132 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1133 }
1134 }
1135
1136 #[test]
1137 fn test_parquet_list_nullable() {
1138 let mut arrow_fields = Vec::new();
1139
1140 let message_type = "
1141 message test_schema {
1142 REQUIRED GROUP my_list1 (LIST) {
1143 REPEATED GROUP list {
1144 OPTIONAL BINARY element (UTF8);
1145 }
1146 }
1147 OPTIONAL GROUP my_list2 (LIST) {
1148 REPEATED GROUP list {
1149 REQUIRED BINARY element (UTF8);
1150 }
1151 }
1152 REQUIRED GROUP my_list3 (LIST) {
1153 REPEATED GROUP list {
1154 REQUIRED BINARY element (UTF8);
1155 }
1156 }
1157 }
1158 ";
1159
1160 {
1167 arrow_fields.push(Field::new_list(
1168 "my_list1",
1169 Field::new("element", DataType::Utf8, true),
1170 false,
1171 ));
1172 }
1173
1174 {
1181 arrow_fields.push(Field::new_list(
1182 "my_list2",
1183 Field::new("element", DataType::Utf8, false),
1184 true,
1185 ));
1186 }
1187
1188 {
1195 arrow_fields.push(Field::new_list(
1196 "my_list3",
1197 Field::new("element", DataType::Utf8, false),
1198 false,
1199 ));
1200 }
1201
1202 let parquet_group_type = parse_message_type(message_type).unwrap();
1203
1204 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1205 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1206 let converted_fields = converted_arrow_schema.fields();
1207
1208 assert_eq!(arrow_fields.len(), converted_fields.len());
1209 for i in 0..arrow_fields.len() {
1210 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1211 }
1212 }
1213
1214 #[test]
1215 fn test_parquet_maps() {
1216 let mut arrow_fields = Vec::new();
1217
1218 let message_type = "
1220 message test_schema {
1221 REQUIRED group my_map1 (MAP) {
1222 REPEATED group key_value {
1223 REQUIRED binary key (UTF8);
1224 OPTIONAL int32 value;
1225 }
1226 }
1227 OPTIONAL group my_map2 (MAP) {
1228 REPEATED group map {
1229 REQUIRED binary str (UTF8);
1230 REQUIRED int32 num;
1231 }
1232 }
1233 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1234 REPEATED group map {
1235 REQUIRED binary key (UTF8);
1236 OPTIONAL int32 value;
1237 }
1238 }
1239 REQUIRED group my_map4 (MAP) {
1240 REPEATED group map {
1241 OPTIONAL binary key (UTF8);
1242 REQUIRED int32 value;
1243 }
1244 }
1245 }
1246 ";
1247
1248 {
1256 arrow_fields.push(Field::new_map(
1257 "my_map1",
1258 "key_value",
1259 Field::new("key", DataType::Utf8, false),
1260 Field::new("value", DataType::Int32, true),
1261 false,
1262 false,
1263 ));
1264 }
1265
1266 {
1274 arrow_fields.push(Field::new_map(
1275 "my_map2",
1276 "map",
1277 Field::new("str", DataType::Utf8, false),
1278 Field::new("num", DataType::Int32, false),
1279 false,
1280 true,
1281 ));
1282 }
1283
1284 {
1292 arrow_fields.push(Field::new_map(
1293 "my_map3",
1294 "map",
1295 Field::new("key", DataType::Utf8, false),
1296 Field::new("value", DataType::Int32, true),
1297 false,
1298 true,
1299 ));
1300 }
1301
1302 {
1310 arrow_fields.push(Field::new_map(
1311 "my_map4",
1312 "map",
1313 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1315 false,
1316 false,
1317 ));
1318 }
1319
1320 let parquet_group_type = parse_message_type(message_type).unwrap();
1321
1322 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1323 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1324 let converted_fields = converted_arrow_schema.fields();
1325
1326 assert_eq!(arrow_fields.len(), converted_fields.len());
1327 for i in 0..arrow_fields.len() {
1328 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1329 }
1330 }
1331
1332 #[test]
1333 fn test_nested_schema() {
1334 let mut arrow_fields = Vec::new();
1335 {
1336 let group1_fields = Fields::from(vec![
1337 Field::new("leaf1", DataType::Boolean, false),
1338 Field::new("leaf2", DataType::Int32, false),
1339 ]);
1340 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1341 arrow_fields.push(group1_struct);
1342
1343 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1344 arrow_fields.push(leaf3_field);
1345 }
1346
1347 let message_type = "
1348 message test_schema {
1349 REQUIRED GROUP group1 {
1350 REQUIRED BOOLEAN leaf1;
1351 REQUIRED INT32 leaf2;
1352 }
1353 REQUIRED INT64 leaf3;
1354 }
1355 ";
1356 let parquet_group_type = parse_message_type(message_type).unwrap();
1357
1358 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1359 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1360 let converted_fields = converted_arrow_schema.fields();
1361
1362 assert_eq!(arrow_fields.len(), converted_fields.len());
1363 for i in 0..arrow_fields.len() {
1364 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1365 }
1366 }
1367
1368 #[test]
1369 fn test_nested_schema_partial() {
1370 let mut arrow_fields = Vec::new();
1371 {
1372 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1373 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1374 arrow_fields.push(group1);
1375
1376 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1377 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1378 arrow_fields.push(group2);
1379
1380 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1381 }
1382
1383 let message_type = "
1384 message test_schema {
1385 REQUIRED GROUP group1 {
1386 REQUIRED INT64 leaf1;
1387 REQUIRED INT64 leaf2;
1388 }
1389 REQUIRED GROUP group2 {
1390 REQUIRED INT64 leaf3;
1391 REQUIRED INT64 leaf4;
1392 }
1393 REQUIRED INT64 leaf5;
1394 }
1395 ";
1396 let parquet_group_type = parse_message_type(message_type).unwrap();
1397
1398 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1408 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1409 let converted_arrow_schema =
1410 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1411 let converted_fields = converted_arrow_schema.fields();
1412
1413 assert_eq!(arrow_fields.len(), converted_fields.len());
1414 for i in 0..arrow_fields.len() {
1415 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1416 }
1417 }
1418
1419 #[test]
1420 fn test_nested_schema_partial_ordering() {
1421 let mut arrow_fields = Vec::new();
1422 {
1423 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1424 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1425 arrow_fields.push(group1);
1426
1427 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1428 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1429 arrow_fields.push(group2);
1430
1431 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1432 }
1433
1434 let message_type = "
1435 message test_schema {
1436 REQUIRED GROUP group1 {
1437 REQUIRED INT64 leaf1;
1438 REQUIRED INT64 leaf2;
1439 }
1440 REQUIRED GROUP group2 {
1441 REQUIRED INT64 leaf3;
1442 REQUIRED INT64 leaf4;
1443 }
1444 REQUIRED INT64 leaf5;
1445 }
1446 ";
1447 let parquet_group_type = parse_message_type(message_type).unwrap();
1448
1449 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1459 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1460 let converted_arrow_schema =
1461 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1462 let converted_fields = converted_arrow_schema.fields();
1463
1464 assert_eq!(arrow_fields.len(), converted_fields.len());
1465 for i in 0..arrow_fields.len() {
1466 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1467 }
1468
1469 let mask =
1470 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1471 let converted_arrow_schema =
1472 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1473 let converted_fields = converted_arrow_schema.fields();
1474
1475 assert_eq!(arrow_fields.len(), converted_fields.len());
1476 for i in 0..arrow_fields.len() {
1477 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1478 }
1479 }
1480
1481 #[test]
1482 fn test_repeated_nested_schema() {
1483 let mut arrow_fields = Vec::new();
1484 {
1485 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1486
1487 let inner_group_list = Field::new_list(
1488 "innerGroup",
1489 Field::new_struct(
1490 "innerGroup",
1491 vec![Field::new("leaf3", DataType::Int32, true)],
1492 false,
1493 ),
1494 false,
1495 );
1496
1497 let outer_group_list = Field::new_list(
1498 "outerGroup",
1499 Field::new_struct(
1500 "outerGroup",
1501 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1502 false,
1503 ),
1504 false,
1505 );
1506 arrow_fields.push(outer_group_list);
1507 }
1508
1509 let message_type = "
1510 message test_schema {
1511 OPTIONAL INT32 leaf1;
1512 REPEATED GROUP outerGroup {
1513 OPTIONAL INT32 leaf2;
1514 REPEATED GROUP innerGroup {
1515 OPTIONAL INT32 leaf3;
1516 }
1517 }
1518 }
1519 ";
1520 let parquet_group_type = parse_message_type(message_type).unwrap();
1521
1522 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1523 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1524 let converted_fields = converted_arrow_schema.fields();
1525
1526 assert_eq!(arrow_fields.len(), converted_fields.len());
1527 for i in 0..arrow_fields.len() {
1528 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1529 }
1530 }
1531
1532 #[test]
1533 fn test_column_desc_to_field() {
1534 let message_type = "
1535 message test_schema {
1536 REQUIRED BOOLEAN boolean;
1537 REQUIRED INT32 int8 (INT_8);
1538 REQUIRED INT32 uint8 (INTEGER(8,false));
1539 REQUIRED INT32 int16 (INT_16);
1540 REQUIRED INT32 uint16 (INTEGER(16,false));
1541 REQUIRED INT32 int32;
1542 REQUIRED INT64 int64;
1543 OPTIONAL DOUBLE double;
1544 OPTIONAL FLOAT float;
1545 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1546 OPTIONAL BINARY string (UTF8);
1547 REPEATED BOOLEAN bools;
1548 OPTIONAL INT32 date (DATE);
1549 OPTIONAL INT32 time_milli (TIME_MILLIS);
1550 OPTIONAL INT64 time_micro (TIME_MICROS);
1551 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1552 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1553 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1554 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1555 REPEATED INT32 int_list;
1556 REPEATED BINARY byte_list;
1557 REPEATED BINARY string_list (UTF8);
1558 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1559 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1560 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1561 }
1562 ";
1563 let parquet_group_type = parse_message_type(message_type).unwrap();
1564
1565 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1566 let converted_arrow_fields = parquet_schema
1567 .columns()
1568 .iter()
1569 .map(|c| parquet_to_arrow_field(c).unwrap())
1570 .collect::<Vec<Field>>();
1571
1572 let arrow_fields = vec![
1573 Field::new("boolean", DataType::Boolean, false),
1574 Field::new("int8", DataType::Int8, false),
1575 Field::new("uint8", DataType::UInt8, false),
1576 Field::new("int16", DataType::Int16, false),
1577 Field::new("uint16", DataType::UInt16, false),
1578 Field::new("int32", DataType::Int32, false),
1579 Field::new("int64", DataType::Int64, false),
1580 Field::new("double", DataType::Float64, true),
1581 Field::new("float", DataType::Float32, true),
1582 Field::new("float16", DataType::Float16, true),
1583 Field::new("string", DataType::Utf8, true),
1584 Field::new_list(
1585 "bools",
1586 Field::new("bools", DataType::Boolean, false),
1587 false,
1588 ),
1589 Field::new("date", DataType::Date32, true),
1590 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1591 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1592 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1593 Field::new(
1594 "ts_milli",
1595 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1596 true,
1597 ),
1598 Field::new(
1599 "ts_micro",
1600 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1601 false,
1602 ),
1603 Field::new(
1604 "ts_nano",
1605 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1606 false,
1607 ),
1608 Field::new_list(
1609 "int_list",
1610 Field::new("int_list", DataType::Int32, false),
1611 false,
1612 ),
1613 Field::new_list(
1614 "byte_list",
1615 Field::new("byte_list", DataType::Binary, false),
1616 false,
1617 ),
1618 Field::new_list(
1619 "string_list",
1620 Field::new("string_list", DataType::Utf8, false),
1621 false,
1622 ),
1623 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1624 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1625 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1626 ];
1627
1628 assert_eq!(arrow_fields, converted_arrow_fields);
1629 }
1630
1631 #[test]
1632 fn test_coerced_map_list() {
1633 let arrow_fields = vec![
1635 Field::new_list(
1636 "my_list",
1637 Field::new("item", DataType::Boolean, true),
1638 false,
1639 ),
1640 Field::new_map(
1641 "my_map",
1642 "entries",
1643 Field::new("keys", DataType::Utf8, false),
1644 Field::new("values", DataType::Int32, true),
1645 false,
1646 true,
1647 ),
1648 ];
1649 let arrow_schema = Schema::new(arrow_fields);
1650
1651 let message_type = "
1653 message parquet_schema {
1654 REQUIRED GROUP my_list (LIST) {
1655 REPEATED GROUP list {
1656 OPTIONAL BOOLEAN element;
1657 }
1658 }
1659 OPTIONAL GROUP my_map (MAP) {
1660 REPEATED GROUP key_value {
1661 REQUIRED BINARY key (STRING);
1662 OPTIONAL INT32 value;
1663 }
1664 }
1665 }
1666 ";
1667 let parquet_group_type = parse_message_type(message_type).unwrap();
1668 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1669 let converted_arrow_schema = ArrowSchemaConverter::new()
1670 .with_coerce_types(true)
1671 .convert(&arrow_schema)
1672 .unwrap();
1673 assert_eq!(
1674 parquet_schema.columns().len(),
1675 converted_arrow_schema.columns().len()
1676 );
1677
1678 let message_type = "
1680 message parquet_schema {
1681 REQUIRED GROUP my_list (LIST) {
1682 REPEATED GROUP list {
1683 OPTIONAL BOOLEAN item;
1684 }
1685 }
1686 OPTIONAL GROUP my_map (MAP) {
1687 REPEATED GROUP entries {
1688 REQUIRED BINARY keys (STRING);
1689 OPTIONAL INT32 values;
1690 }
1691 }
1692 }
1693 ";
1694 let parquet_group_type = parse_message_type(message_type).unwrap();
1695 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1696 let converted_arrow_schema = ArrowSchemaConverter::new()
1697 .with_coerce_types(false)
1698 .convert(&arrow_schema)
1699 .unwrap();
1700 assert_eq!(
1701 parquet_schema.columns().len(),
1702 converted_arrow_schema.columns().len()
1703 );
1704 }
1705
1706 #[test]
1707 fn test_field_to_column_desc() {
1708 let message_type = "
1709 message arrow_schema {
1710 REQUIRED BOOLEAN boolean;
1711 REQUIRED INT32 int8 (INT_8);
1712 REQUIRED INT32 int16 (INTEGER(16,true));
1713 REQUIRED INT32 int32;
1714 REQUIRED INT64 int64;
1715 OPTIONAL DOUBLE double;
1716 OPTIONAL FLOAT float;
1717 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1718 OPTIONAL BINARY string (STRING);
1719 OPTIONAL GROUP bools (LIST) {
1720 REPEATED GROUP list {
1721 OPTIONAL BOOLEAN element;
1722 }
1723 }
1724 REQUIRED GROUP bools_non_null (LIST) {
1725 REPEATED GROUP list {
1726 REQUIRED BOOLEAN element;
1727 }
1728 }
1729 OPTIONAL INT32 date (DATE);
1730 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1731 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1732 OPTIONAL INT64 time_micro (TIME_MICROS);
1733 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1734 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1735 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1736 REQUIRED INT64 ts_seconds;
1737 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1738 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1739 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1740 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1741 REQUIRED GROUP struct {
1742 REQUIRED BOOLEAN bools;
1743 REQUIRED INT32 uint32 (INTEGER(32,false));
1744 REQUIRED GROUP int32 (LIST) {
1745 REPEATED GROUP list {
1746 OPTIONAL INT32 element;
1747 }
1748 }
1749 }
1750 REQUIRED BINARY dictionary_strings (STRING);
1751 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1752 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1753 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1754 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1755 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1756 }
1757 ";
1758 let parquet_group_type = parse_message_type(message_type).unwrap();
1759
1760 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1761
1762 let arrow_fields = vec![
1763 Field::new("boolean", DataType::Boolean, false),
1764 Field::new("int8", DataType::Int8, false),
1765 Field::new("int16", DataType::Int16, false),
1766 Field::new("int32", DataType::Int32, false),
1767 Field::new("int64", DataType::Int64, false),
1768 Field::new("double", DataType::Float64, true),
1769 Field::new("float", DataType::Float32, true),
1770 Field::new("float16", DataType::Float16, true),
1771 Field::new("string", DataType::Utf8, true),
1772 Field::new_list(
1773 "bools",
1774 Field::new("element", DataType::Boolean, true),
1775 true,
1776 ),
1777 Field::new_list(
1778 "bools_non_null",
1779 Field::new("element", DataType::Boolean, false),
1780 false,
1781 ),
1782 Field::new("date", DataType::Date32, true),
1783 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1784 Field::new(
1785 "time_milli_utc",
1786 DataType::Time32(TimeUnit::Millisecond),
1787 true,
1788 )
1789 .with_metadata(HashMap::from_iter(vec![(
1790 "adjusted_to_utc".to_string(),
1791 "".to_string(),
1792 )])),
1793 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1794 Field::new(
1795 "time_micro_utc",
1796 DataType::Time64(TimeUnit::Microsecond),
1797 true,
1798 )
1799 .with_metadata(HashMap::from_iter(vec![(
1800 "adjusted_to_utc".to_string(),
1801 "".to_string(),
1802 )])),
1803 Field::new(
1804 "ts_milli",
1805 DataType::Timestamp(TimeUnit::Millisecond, None),
1806 true,
1807 ),
1808 Field::new(
1809 "ts_micro",
1810 DataType::Timestamp(TimeUnit::Microsecond, None),
1811 false,
1812 ),
1813 Field::new(
1814 "ts_seconds",
1815 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1816 false,
1817 ),
1818 Field::new(
1819 "ts_micro_utc",
1820 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1821 false,
1822 ),
1823 Field::new(
1824 "ts_millis_zero_offset",
1825 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1826 false,
1827 ),
1828 Field::new(
1829 "ts_millis_zero_negative_offset",
1830 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1831 false,
1832 ),
1833 Field::new(
1834 "ts_micro_non_utc",
1835 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1836 false,
1837 ),
1838 Field::new_struct(
1839 "struct",
1840 vec![
1841 Field::new("bools", DataType::Boolean, false),
1842 Field::new("uint32", DataType::UInt32, false),
1843 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1844 ],
1845 false,
1846 ),
1847 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1848 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1849 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1850 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1851 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1852 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1853 ];
1854 let arrow_schema = Schema::new(arrow_fields);
1855 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1856
1857 assert_eq!(
1858 parquet_schema.columns().len(),
1859 converted_arrow_schema.columns().len()
1860 );
1861 parquet_schema
1862 .columns()
1863 .iter()
1864 .zip(converted_arrow_schema.columns())
1865 .for_each(|(a, b)| {
1866 match a.logical_type() {
1871 Some(_) => {
1872 assert_eq!(a, b)
1873 }
1874 None => {
1875 assert_eq!(a.name(), b.name());
1876 assert_eq!(a.physical_type(), b.physical_type());
1877 assert_eq!(a.converted_type(), b.converted_type());
1878 }
1879 };
1880 });
1881 }
1882
1883 #[test]
1884 #[should_panic(expected = "Parquet does not support writing empty structs")]
1885 fn test_empty_struct_field() {
1886 let arrow_fields = vec![Field::new(
1887 "struct",
1888 DataType::Struct(Fields::empty()),
1889 false,
1890 )];
1891 let arrow_schema = Schema::new(arrow_fields);
1892 let converted_arrow_schema = ArrowSchemaConverter::new()
1893 .with_coerce_types(true)
1894 .convert(&arrow_schema);
1895
1896 converted_arrow_schema.unwrap();
1897 }
1898
1899 #[test]
1900 fn test_metadata() {
1901 let message_type = "
1902 message test_schema {
1903 OPTIONAL BINARY string (STRING);
1904 }
1905 ";
1906 let parquet_group_type = parse_message_type(message_type).unwrap();
1907
1908 let key_value_metadata = vec![
1909 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1910 KeyValue::new("baz".to_owned(), None),
1911 ];
1912
1913 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1914 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1915
1916 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1917 let converted_arrow_schema =
1918 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1919
1920 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1921 }
1922
1923 #[test]
1924 fn test_arrow_schema_roundtrip() -> Result<()> {
1925 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1926 a.iter()
1927 .map(|(a, b)| (a.to_string(), b.to_string()))
1928 .collect()
1929 };
1930
1931 let schema = Schema::new_with_metadata(
1932 vec![
1933 Field::new("c1", DataType::Utf8, false)
1934 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1935 Field::new("c2", DataType::Binary, false),
1936 Field::new("c3", DataType::FixedSizeBinary(3), false),
1937 Field::new("c4", DataType::Boolean, false),
1938 Field::new("c5", DataType::Date32, false),
1939 Field::new("c6", DataType::Date64, false),
1940 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1941 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1942 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1943 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1944 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1945 Field::new(
1946 "c16",
1947 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1948 false,
1949 ),
1950 Field::new(
1951 "c17",
1952 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1953 false,
1954 ),
1955 Field::new(
1956 "c18",
1957 DataType::Timestamp(TimeUnit::Nanosecond, None),
1958 false,
1959 ),
1960 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1961 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1962 Field::new_list(
1963 "c21",
1964 Field::new_list_field(DataType::Boolean, true)
1965 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1966 false,
1967 )
1968 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1969 Field::new(
1970 "c22",
1971 DataType::FixedSizeList(
1972 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1973 5,
1974 ),
1975 false,
1976 ),
1977 Field::new_list(
1978 "c23",
1979 Field::new_large_list(
1980 "inner",
1981 Field::new_list_field(
1982 DataType::Struct(
1983 vec![
1984 Field::new("a", DataType::Int16, true),
1985 Field::new("b", DataType::Float64, false),
1986 Field::new("c", DataType::Float32, false),
1987 Field::new("d", DataType::Float16, false),
1988 ]
1989 .into(),
1990 ),
1991 false,
1992 ),
1993 true,
1994 ),
1995 false,
1996 ),
1997 Field::new(
1998 "c24",
1999 DataType::Struct(Fields::from(vec![
2000 Field::new("a", DataType::Utf8, false),
2001 Field::new("b", DataType::UInt16, false),
2002 ])),
2003 false,
2004 ),
2005 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2006 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2007 #[allow(deprecated)]
2013 Field::new_dict(
2014 "c31",
2015 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2016 true,
2017 123,
2018 true,
2019 )
2020 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2021 Field::new("c32", DataType::LargeBinary, true),
2022 Field::new("c33", DataType::LargeUtf8, true),
2023 Field::new_large_list(
2024 "c34",
2025 Field::new_list(
2026 "inner",
2027 Field::new_list_field(
2028 DataType::Struct(
2029 vec![
2030 Field::new("a", DataType::Int16, true),
2031 Field::new("b", DataType::Float64, true),
2032 ]
2033 .into(),
2034 ),
2035 true,
2036 ),
2037 true,
2038 ),
2039 true,
2040 ),
2041 Field::new("c35", DataType::Null, true),
2042 Field::new("c36", DataType::Decimal128(2, 1), false),
2043 Field::new("c37", DataType::Decimal256(50, 20), false),
2044 Field::new("c38", DataType::Decimal128(18, 12), true),
2045 Field::new_map(
2046 "c39",
2047 "key_value",
2048 Field::new("key", DataType::Utf8, false),
2049 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2050 false, true,
2052 ),
2053 Field::new_map(
2054 "c40",
2055 "my_entries",
2056 Field::new("my_key", DataType::Utf8, false)
2057 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2058 Field::new_list(
2059 "my_value",
2060 Field::new_list_field(DataType::Utf8, true)
2061 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2062 true,
2063 )
2064 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2065 false, true,
2067 )
2068 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2069 Field::new_map(
2070 "c41",
2071 "my_entries",
2072 Field::new("my_key", DataType::Utf8, false),
2073 Field::new_list(
2074 "my_value",
2075 Field::new_list_field(DataType::Utf8, true)
2076 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2077 true,
2078 ),
2079 false, false,
2081 ),
2082 ],
2083 meta(&[("Key", "Value")]),
2084 );
2085
2086 let file = tempfile::tempfile().unwrap();
2088 let writer =
2089 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2090 writer.close()?;
2091
2092 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2094
2095 let read_schema = arrow_reader.schema();
2097 assert_eq!(&schema, read_schema.as_ref());
2098
2099 let mut stack = Vec::with_capacity(10);
2101 let mut out = Vec::with_capacity(10);
2102
2103 let root = arrow_reader.parquet_schema().root_schema_ptr();
2104 stack.push((root.name().to_string(), root));
2105
2106 while let Some((p, t)) = stack.pop() {
2107 if t.is_group() {
2108 for f in t.get_fields() {
2109 stack.push((format!("{p}.{}", f.name()), f.clone()))
2110 }
2111 }
2112
2113 let info = t.get_basic_info();
2114 if info.has_id() {
2115 out.push(format!("{p} -> {}", info.id()))
2116 }
2117 }
2118 out.sort_unstable();
2119 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2120
2121 assert_eq!(
2122 &out,
2123 &[
2124 "arrow_schema.c1 -> 2",
2125 "arrow_schema.c21 -> 4",
2126 "arrow_schema.c21.list.item -> 5",
2127 "arrow_schema.c31 -> 6",
2128 "arrow_schema.c40 -> 7",
2129 "arrow_schema.c40.my_entries.my_key -> 8",
2130 "arrow_schema.c40.my_entries.my_value -> 9",
2131 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2132 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2133 ]
2134 );
2135
2136 Ok(())
2137 }
2138
2139 #[test]
2140 fn test_read_parquet_field_ids_raw() -> Result<()> {
2141 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2142 a.iter()
2143 .map(|(a, b)| (a.to_string(), b.to_string()))
2144 .collect()
2145 };
2146 let schema = Schema::new_with_metadata(
2147 vec![
2148 Field::new("c1", DataType::Utf8, true)
2149 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2150 Field::new("c2", DataType::Utf8, true)
2151 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2152 ],
2153 HashMap::new(),
2154 );
2155
2156 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2157 let parquet_bytes = writer.into_inner()?;
2158
2159 let reader =
2160 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2161 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2162
2163 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2165
2166 let parq_schema_descr = ArrowSchemaConverter::new()
2167 .with_coerce_types(true)
2168 .convert(&arrow_schema)?;
2169 let parq_fields = parq_schema_descr.root_schema().get_fields();
2170 assert_eq!(parq_fields.len(), 2);
2171 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2172 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2173
2174 Ok(())
2175 }
2176
2177 #[test]
2178 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2179 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2180 .iter()
2181 .cloned()
2182 .collect();
2183
2184 let schema = Schema::new_with_metadata(
2185 vec![
2186 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2187 Field::new(
2188 "c22",
2189 DataType::FixedSizeList(
2190 Arc::new(Field::new("items", DataType::Boolean, false)),
2191 5,
2192 ),
2193 false,
2194 ),
2195 Field::new_list(
2196 "c23",
2197 Field::new_large_list(
2198 "items",
2199 Field::new_struct(
2200 "items",
2201 vec![
2202 Field::new("a", DataType::Int16, true),
2203 Field::new("b", DataType::Float64, false),
2204 ],
2205 true,
2206 ),
2207 true,
2208 ),
2209 true,
2210 ),
2211 ],
2212 metadata,
2213 );
2214
2215 let file = tempfile::tempfile().unwrap();
2217 let writer =
2218 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2219 writer.close()?;
2220
2221 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2223 let read_schema = arrow_reader.schema();
2224 assert_eq!(&schema, read_schema.as_ref());
2225 Ok(())
2226 }
2227
2228 #[test]
2229 fn test_get_arrow_schema_from_metadata() {
2230 assert!(get_arrow_schema_from_metadata("").is_err());
2231 }
2232
2233 #[test]
2234 #[cfg(feature = "arrow_canonical_extension_types")]
2235 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2236 let arrow_schema = Schema::new(vec![Field::new(
2237 "uuid",
2238 DataType::FixedSizeBinary(16),
2239 false,
2240 )
2241 .with_extension_type(Uuid)]);
2242
2243 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2244
2245 assert_eq!(
2246 parquet_schema.column(0).logical_type(),
2247 Some(LogicalType::Uuid)
2248 );
2249
2250 Ok(())
2255 }
2256
2257 #[test]
2258 #[cfg(feature = "arrow_canonical_extension_types")]
2259 fn arrow_json_to_parquet_json() -> Result<()> {
2260 let arrow_schema = Schema::new(vec![
2261 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2262 ]);
2263
2264 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2265
2266 assert_eq!(
2267 parquet_schema.column(0).logical_type(),
2268 Some(LogicalType::Json)
2269 );
2270
2271 Ok(())
2280 }
2281}