1use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38
39use super::PARQUET_FIELD_ID_META_KEY;
40use crate::arrow::ProjectionMask;
41use crate::arrow::schema::extension::{
42 has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
43 logical_type_for_struct, try_add_extension_type,
44};
45pub(crate) use complex::{ParquetField, ParquetFieldType};
46
47pub fn parquet_to_arrow_schema(
52 parquet_schema: &SchemaDescriptor,
53 key_value_metadata: Option<&Vec<KeyValue>>,
54) -> Result<Schema> {
55 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
56}
57
58pub fn parquet_to_arrow_schema_by_columns(
61 parquet_schema: &SchemaDescriptor,
62 mask: ProjectionMask,
63 key_value_metadata: Option<&Vec<KeyValue>>,
64) -> Result<Schema> {
65 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
66}
67
68pub(crate) fn parquet_to_arrow_schema_and_fields(
74 parquet_schema: &SchemaDescriptor,
75 mask: ProjectionMask,
76 key_value_metadata: Option<&Vec<KeyValue>>,
77) -> Result<(Schema, Option<ParquetField>)> {
78 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
79 let maybe_schema = metadata
80 .remove(super::ARROW_SCHEMA_META_KEY)
81 .map(|value| get_arrow_schema_from_metadata(&value))
82 .transpose()?;
83
84 if let Some(arrow_schema) = &maybe_schema {
86 arrow_schema.metadata().iter().for_each(|(k, v)| {
87 metadata.entry(k.clone()).or_insert_with(|| v.clone());
88 });
89 }
90
91 let hint = maybe_schema.as_ref().map(|s| s.fields());
92 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
93 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
94 Ok((schema, field_levels.levels))
95}
96
97#[derive(Debug, Clone)]
105pub struct FieldLevels {
106 pub(crate) fields: Fields,
107 pub(crate) levels: Option<ParquetField>,
108}
109
110pub fn parquet_to_arrow_field_levels(
131 schema: &SchemaDescriptor,
132 mask: ProjectionMask,
133 hint: Option<&Fields>,
134) -> Result<FieldLevels> {
135 match complex::convert_schema(schema, mask, hint)? {
136 Some(field) => match &field.arrow_type {
137 DataType::Struct(fields) => Ok(FieldLevels {
138 fields: fields.clone(),
139 levels: Some(field),
140 }),
141 _ => unreachable!(),
142 },
143 None => Ok(FieldLevels {
144 fields: Fields::empty(),
145 levels: None,
146 }),
147 }
148}
149
150fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
152 let decoded = BASE64_STANDARD.decode(encoded_meta);
153 match decoded {
154 Ok(bytes) => {
155 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
156 &bytes[8..]
157 } else {
158 bytes.as_slice()
159 };
160 match arrow_ipc::root_as_message(slice) {
161 Ok(message) => message
162 .header_as_schema()
163 .map(arrow_ipc::convert::fb_to_schema)
164 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
165 Err(err) => {
166 Err(arrow_err!(
168 "Unable to get root as message stored in {}: {:?}",
169 super::ARROW_SCHEMA_META_KEY,
170 err
171 ))
172 }
173 }
174 }
175 Err(err) => {
176 Err(arrow_err!(
178 "Unable to decode the encoded schema stored in {}, {:?}",
179 super::ARROW_SCHEMA_META_KEY,
180 err
181 ))
182 }
183 }
184}
185
186pub fn encode_arrow_schema(schema: &Schema) -> String {
188 let options = writer::IpcWriteOptions::default();
189 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
190 let data_gen = writer::IpcDataGenerator::default();
191 let mut serialized_schema =
192 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
193
194 let schema_len = serialized_schema.ipc_message.len();
197 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200 len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202 BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
212 let encoded = encode_arrow_schema(schema);
213
214 let schema_kv = KeyValue {
215 key: super::ARROW_SCHEMA_META_KEY.to_string(),
216 value: Some(encoded),
217 };
218
219 let meta = props
220 .key_value_metadata
221 .get_or_insert_with(Default::default);
222
223 let schema_meta = meta
225 .iter()
226 .enumerate()
227 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
228 match schema_meta {
229 Some((i, _)) => {
230 meta.remove(i);
231 meta.push(schema_kv);
232 }
233 None => {
234 meta.push(schema_kv);
235 }
236 }
237}
238
239#[derive(Debug)]
284pub struct ArrowSchemaConverter<'a> {
285 schema_root: &'a str,
287 coerce_types: bool,
291}
292
293impl Default for ArrowSchemaConverter<'_> {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299impl<'a> ArrowSchemaConverter<'a> {
300 pub fn new() -> Self {
302 Self {
303 schema_root: "arrow_schema",
304 coerce_types: false,
305 }
306 }
307
308 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
339 self.coerce_types = coerce_types;
340 self
341 }
342
343 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
345 self.schema_root = schema_root;
346 self
347 }
348
349 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
353 let fields = schema
354 .fields()
355 .iter()
356 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
357 .collect::<Result<_>>()?;
358 let group = Type::group_type_builder(self.schema_root)
359 .with_fields(fields)
360 .build()?;
361 Ok(SchemaDescriptor::new(Arc::new(group)))
362 }
363}
364
365fn parse_key_value_metadata(
366 key_value_metadata: Option<&Vec<KeyValue>>,
367) -> Option<HashMap<String, String>> {
368 match key_value_metadata {
369 Some(key_values) => {
370 let map: HashMap<String, String> = key_values
371 .iter()
372 .filter_map(|kv| {
373 kv.value
374 .as_ref()
375 .map(|value| (kv.key.clone(), value.clone()))
376 })
377 .collect();
378
379 if map.is_empty() { None } else { Some(map) }
380 }
381 None => None,
382 }
383}
384
385pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
387 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
388 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
389
390 let parquet_type = parquet_column.self_type();
391 let basic_info = parquet_type.get_basic_info();
392
393 let mut hash_map_size = 0;
394 if basic_info.has_id() {
395 hash_map_size += 1;
396 }
397 if has_extension_type(parquet_type) {
398 hash_map_size += 1;
399 }
400 if hash_map_size == 0 {
401 return Ok(ret);
402 }
403 ret.set_metadata(HashMap::with_capacity(hash_map_size));
404 if basic_info.has_id() {
405 ret.metadata_mut().insert(
406 PARQUET_FIELD_ID_META_KEY.to_string(),
407 basic_info.id().to_string(),
408 );
409 }
410 try_add_extension_type(ret, parquet_column.self_type())
411}
412
413pub fn decimal_length_from_precision(precision: u8) -> usize {
414 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
422}
423
424fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
426 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
427 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
428 const PARQUET_KEY_FIELD_NAME: &str = "key";
429 const PARQUET_VALUE_FIELD_NAME: &str = "value";
430
431 let name = field.name().as_str();
432 let repetition = if field.is_nullable() {
433 Repetition::OPTIONAL
434 } else {
435 Repetition::REQUIRED
436 };
437 let id = field_id(field);
438 match field.data_type() {
440 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
441 .with_logical_type(Some(LogicalType::Unknown))
442 .with_repetition(repetition)
443 .with_id(id)
444 .build(),
445 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
446 .with_repetition(repetition)
447 .with_id(id)
448 .build(),
449 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
450 .with_logical_type(Some(LogicalType::Integer {
451 bit_width: 8,
452 is_signed: true,
453 }))
454 .with_repetition(repetition)
455 .with_id(id)
456 .build(),
457 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
458 .with_logical_type(Some(LogicalType::Integer {
459 bit_width: 16,
460 is_signed: true,
461 }))
462 .with_repetition(repetition)
463 .with_id(id)
464 .build(),
465 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
466 .with_repetition(repetition)
467 .with_id(id)
468 .build(),
469 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
470 .with_repetition(repetition)
471 .with_id(id)
472 .build(),
473 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
474 .with_logical_type(Some(LogicalType::Integer {
475 bit_width: 8,
476 is_signed: false,
477 }))
478 .with_repetition(repetition)
479 .with_id(id)
480 .build(),
481 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
482 .with_logical_type(Some(LogicalType::Integer {
483 bit_width: 16,
484 is_signed: false,
485 }))
486 .with_repetition(repetition)
487 .with_id(id)
488 .build(),
489 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
490 .with_logical_type(Some(LogicalType::Integer {
491 bit_width: 32,
492 is_signed: false,
493 }))
494 .with_repetition(repetition)
495 .with_id(id)
496 .build(),
497 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
498 .with_logical_type(Some(LogicalType::Integer {
499 bit_width: 64,
500 is_signed: false,
501 }))
502 .with_repetition(repetition)
503 .with_id(id)
504 .build(),
505 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
506 .with_repetition(repetition)
507 .with_id(id)
508 .with_logical_type(Some(LogicalType::Float16))
509 .with_length(2)
510 .build(),
511 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
512 .with_repetition(repetition)
513 .with_id(id)
514 .build(),
515 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
516 .with_repetition(repetition)
517 .with_id(id)
518 .build(),
519 DataType::Timestamp(TimeUnit::Second, _) => {
520 Type::primitive_type_builder(name, PhysicalType::INT64)
522 .with_repetition(repetition)
523 .with_id(id)
524 .build()
525 }
526 DataType::Timestamp(time_unit, tz) => {
527 Type::primitive_type_builder(name, PhysicalType::INT64)
528 .with_logical_type(Some(LogicalType::Timestamp {
529 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
531 unit: match time_unit {
532 TimeUnit::Second => unreachable!(),
533 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
534 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
535 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
536 },
537 }))
538 .with_repetition(repetition)
539 .with_id(id)
540 .build()
541 }
542 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
543 .with_logical_type(Some(LogicalType::Date))
544 .with_repetition(repetition)
545 .with_id(id)
546 .build(),
547 DataType::Date64 => {
548 if coerce_types {
549 Type::primitive_type_builder(name, PhysicalType::INT32)
550 .with_logical_type(Some(LogicalType::Date))
551 .with_repetition(repetition)
552 .with_id(id)
553 .build()
554 } else {
555 Type::primitive_type_builder(name, PhysicalType::INT64)
556 .with_repetition(repetition)
557 .with_id(id)
558 .build()
559 }
560 }
561 DataType::Time32(TimeUnit::Second) => {
562 Type::primitive_type_builder(name, PhysicalType::INT32)
564 .with_repetition(repetition)
565 .with_id(id)
566 .build()
567 }
568 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
569 .with_logical_type(Some(LogicalType::Time {
570 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
571 unit: match unit {
572 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
573 u => unreachable!("Invalid unit for Time32: {:?}", u),
574 },
575 }))
576 .with_repetition(repetition)
577 .with_id(id)
578 .build(),
579 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
580 .with_logical_type(Some(LogicalType::Time {
581 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
582 unit: match unit {
583 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
584 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
585 u => unreachable!("Invalid unit for Time64: {:?}", u),
586 },
587 }))
588 .with_repetition(repetition)
589 .with_id(id)
590 .build(),
591 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
592 .with_repetition(repetition)
593 .with_id(id)
594 .build(),
595 DataType::Interval(_) => {
596 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
597 .with_converted_type(ConvertedType::INTERVAL)
598 .with_repetition(repetition)
599 .with_id(id)
600 .with_length(12)
601 .build()
602 }
603 DataType::Binary | DataType::LargeBinary => {
604 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
605 .with_repetition(repetition)
606 .with_id(id)
607 .build()
608 }
609 DataType::FixedSizeBinary(length) => {
610 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
611 .with_repetition(repetition)
612 .with_id(id)
613 .with_length(*length)
614 .with_logical_type(logical_type_for_fixed_size_binary(field))
615 .build()
616 }
617 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
618 .with_repetition(repetition)
619 .with_id(id)
620 .build(),
621 DataType::Decimal32(precision, scale)
622 | DataType::Decimal64(precision, scale)
623 | DataType::Decimal128(precision, scale)
624 | DataType::Decimal256(precision, scale) => {
625 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
628 (PhysicalType::INT32, -1)
629 } else if *precision <= 18 {
630 (PhysicalType::INT64, -1)
631 } else {
632 (
633 PhysicalType::FIXED_LEN_BYTE_ARRAY,
634 decimal_length_from_precision(*precision) as i32,
635 )
636 };
637 Type::primitive_type_builder(name, physical_type)
638 .with_repetition(repetition)
639 .with_id(id)
640 .with_length(length)
641 .with_logical_type(Some(LogicalType::Decimal {
642 scale: *scale as i32,
643 precision: *precision as i32,
644 }))
645 .with_precision(*precision as i32)
646 .with_scale(*scale as i32)
647 .build()
648 }
649 DataType::Utf8 | DataType::LargeUtf8 => {
650 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
651 .with_logical_type(logical_type_for_string(field))
652 .with_repetition(repetition)
653 .with_id(id)
654 .build()
655 }
656 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
657 .with_logical_type(logical_type_for_string(field))
658 .with_repetition(repetition)
659 .with_id(id)
660 .build(),
661 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
662 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
663 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
665 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
666 } else {
667 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
668 };
669
670 Type::group_type_builder(name)
671 .with_fields(vec![Arc::new(
672 Type::group_type_builder("list")
673 .with_fields(vec![field_ref])
674 .with_repetition(Repetition::REPEATED)
675 .build()?,
676 )])
677 .with_logical_type(Some(LogicalType::List))
678 .with_repetition(repetition)
679 .with_id(id)
680 .build()
681 }
682 DataType::ListView(_) | DataType::LargeListView(_) => {
683 unimplemented!("ListView/LargeListView not implemented")
684 }
685 DataType::Struct(fields) => {
686 if fields.is_empty() {
687 return Err(arrow_err!("Parquet does not support writing empty structs",));
688 }
689 let fields = fields
691 .iter()
692 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
693 .collect::<Result<_>>()?;
694 Type::group_type_builder(name)
695 .with_fields(fields)
696 .with_repetition(repetition)
697 .with_id(id)
698 .with_logical_type(logical_type_for_struct(field))
699 .build()
700 }
701 DataType::Map(field, _) => {
702 if let DataType::Struct(struct_fields) = field.data_type() {
703 let map_struct_name = if coerce_types {
705 PARQUET_MAP_STRUCT_NAME
706 } else {
707 field.name()
708 };
709
710 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
712 if coerce_types && fld.name() != name {
713 let f = fld.as_ref().clone().with_name(name);
714 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
715 } else {
716 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
717 }
718 };
719 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
720 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
721
722 Type::group_type_builder(name)
723 .with_fields(vec![Arc::new(
724 Type::group_type_builder(map_struct_name)
725 .with_fields(vec![key_field, val_field])
726 .with_repetition(Repetition::REPEATED)
727 .build()?,
728 )])
729 .with_logical_type(Some(LogicalType::Map))
730 .with_repetition(repetition)
731 .with_id(id)
732 .build()
733 } else {
734 Err(arrow_err!(
735 "DataType::Map should contain a struct field child",
736 ))
737 }
738 }
739 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
740 DataType::Dictionary(_, value) => {
741 let dict_field = field.clone().with_data_type(value.as_ref().clone());
743 arrow_to_parquet_type(&dict_field, coerce_types)
744 }
745 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
746 "Converting RunEndEncodedType to parquet not supported",
747 )),
748 }
749}
750
751fn field_id(field: &Field) -> Option<i32> {
752 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
753 value.parse().ok() }
755
756#[cfg(test)]
757mod tests {
758 use super::*;
759
760 use std::{collections::HashMap, sync::Arc};
761
762 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
763 use crate::file::metadata::KeyValue;
764 use crate::file::reader::FileReader;
765 use crate::{
766 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
767 schema::{parser::parse_message_type, types::SchemaDescriptor},
768 };
769 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
770
771 #[test]
772 fn test_flat_primitives() {
773 let message_type = "
774 message test_schema {
775 REQUIRED BOOLEAN boolean;
776 REQUIRED INT32 int8 (INT_8);
777 REQUIRED INT32 int16 (INT_16);
778 REQUIRED INT32 uint8 (INTEGER(8,false));
779 REQUIRED INT32 uint16 (INTEGER(16,false));
780 REQUIRED INT32 int32;
781 REQUIRED INT64 int64;
782 OPTIONAL DOUBLE double;
783 OPTIONAL FLOAT float;
784 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
785 OPTIONAL BINARY string (UTF8);
786 OPTIONAL BINARY string_2 (STRING);
787 OPTIONAL BINARY json (JSON);
788 }
789 ";
790 let parquet_group_type = parse_message_type(message_type).unwrap();
791
792 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
793 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
794
795 let arrow_fields = Fields::from(vec![
796 Field::new("boolean", DataType::Boolean, false),
797 Field::new("int8", DataType::Int8, false),
798 Field::new("int16", DataType::Int16, false),
799 Field::new("uint8", DataType::UInt8, false),
800 Field::new("uint16", DataType::UInt16, false),
801 Field::new("int32", DataType::Int32, false),
802 Field::new("int64", DataType::Int64, false),
803 Field::new("double", DataType::Float64, true),
804 Field::new("float", DataType::Float32, true),
805 Field::new("float16", DataType::Float16, true),
806 Field::new("string", DataType::Utf8, true),
807 Field::new("string_2", DataType::Utf8, true),
808 json_field(),
809 ]);
810
811 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
812 }
813
814 fn json_field() -> Field {
817 #[cfg(feature = "arrow_canonical_extension_types")]
818 {
819 Field::new("json", DataType::Utf8, true)
820 .with_extension_type(arrow_schema::extension::Json::default())
821 }
822 #[cfg(not(feature = "arrow_canonical_extension_types"))]
823 {
824 Field::new("json", DataType::Utf8, true)
825 }
826 }
827
828 #[test]
829 fn test_decimal_fields() {
830 let message_type = "
831 message test_schema {
832 REQUIRED INT32 decimal1 (DECIMAL(4,2));
833 REQUIRED INT64 decimal2 (DECIMAL(12,2));
834 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
835 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
836 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
837 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
838 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
839 }
840 ";
841
842 let parquet_group_type = parse_message_type(message_type).unwrap();
843
844 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
845 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
846
847 let arrow_fields = Fields::from(vec![
848 Field::new("decimal1", DataType::Decimal128(4, 2), false),
849 Field::new("decimal2", DataType::Decimal128(12, 2), false),
850 Field::new("decimal3", DataType::Decimal128(30, 2), false),
851 Field::new("decimal4", DataType::Decimal128(33, 2), false),
852 Field::new("decimal5", DataType::Decimal128(38, 2), false),
853 Field::new("decimal6", DataType::Decimal256(39, 2), false),
854 Field::new("decimal7", DataType::Decimal256(39, 2), false),
855 ]);
856 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
857 }
858
859 #[test]
860 fn test_byte_array_fields() {
861 let message_type = "
862 message test_schema {
863 REQUIRED BYTE_ARRAY binary;
864 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
865 }
866 ";
867
868 let parquet_group_type = parse_message_type(message_type).unwrap();
869
870 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
871 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
872
873 let arrow_fields = Fields::from(vec![
874 Field::new("binary", DataType::Binary, false),
875 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
876 ]);
877 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
878 }
879
880 #[test]
881 fn test_duplicate_fields() {
882 let message_type = "
883 message test_schema {
884 REQUIRED BOOLEAN boolean;
885 REQUIRED INT32 int8 (INT_8);
886 }
887 ";
888
889 let parquet_group_type = parse_message_type(message_type).unwrap();
890
891 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
892 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
893
894 let arrow_fields = Fields::from(vec![
895 Field::new("boolean", DataType::Boolean, false),
896 Field::new("int8", DataType::Int8, false),
897 ]);
898 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
899
900 let converted_arrow_schema =
901 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
902 .unwrap();
903 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
904 }
905
906 #[test]
907 fn test_parquet_lists() {
908 let mut arrow_fields = Vec::new();
909
910 let message_type = "
912 message test_schema {
913 REQUIRED GROUP my_list (LIST) {
914 REPEATED GROUP list {
915 OPTIONAL BINARY element (UTF8);
916 }
917 }
918 OPTIONAL GROUP my_list (LIST) {
919 REPEATED GROUP list {
920 REQUIRED BINARY element (UTF8);
921 }
922 }
923 OPTIONAL GROUP array_of_arrays (LIST) {
924 REPEATED GROUP list {
925 REQUIRED GROUP element (LIST) {
926 REPEATED GROUP list {
927 REQUIRED INT32 element;
928 }
929 }
930 }
931 }
932 OPTIONAL GROUP my_list (LIST) {
933 REPEATED GROUP element {
934 REQUIRED BINARY str (UTF8);
935 }
936 }
937 OPTIONAL GROUP my_list (LIST) {
938 REPEATED INT32 element;
939 }
940 OPTIONAL GROUP my_list (LIST) {
941 REPEATED GROUP element {
942 REQUIRED BINARY str (UTF8);
943 REQUIRED INT32 num;
944 }
945 }
946 OPTIONAL GROUP my_list (LIST) {
947 REPEATED GROUP array {
948 REQUIRED BINARY str (UTF8);
949 }
950
951 }
952 OPTIONAL GROUP my_list (LIST) {
953 REPEATED GROUP my_list_tuple {
954 REQUIRED BINARY str (UTF8);
955 }
956 }
957 REPEATED INT32 name;
958 }
959 ";
960
961 {
968 arrow_fields.push(Field::new_list(
969 "my_list",
970 Field::new("element", DataType::Utf8, true),
971 false,
972 ));
973 }
974
975 {
982 arrow_fields.push(Field::new_list(
983 "my_list",
984 Field::new("element", DataType::Utf8, false),
985 true,
986 ));
987 }
988
989 {
1002 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1003 arrow_fields.push(Field::new_list(
1004 "array_of_arrays",
1005 Field::new_list("element", arrow_inner_list, false),
1006 true,
1007 ));
1008 }
1009
1010 {
1017 arrow_fields.push(Field::new_list(
1018 "my_list",
1019 Field::new("str", DataType::Utf8, false),
1020 true,
1021 ));
1022 }
1023
1024 {
1029 arrow_fields.push(Field::new_list(
1030 "my_list",
1031 Field::new("element", DataType::Int32, false),
1032 true,
1033 ));
1034 }
1035
1036 {
1044 let fields = vec![
1045 Field::new("str", DataType::Utf8, false),
1046 Field::new("num", DataType::Int32, false),
1047 ];
1048 arrow_fields.push(Field::new_list(
1049 "my_list",
1050 Field::new_struct("element", fields, false),
1051 true,
1052 ));
1053 }
1054
1055 {
1063 let fields = vec![Field::new("str", DataType::Utf8, false)];
1064 arrow_fields.push(Field::new_list(
1065 "my_list",
1066 Field::new_struct("array", fields, false),
1067 true,
1068 ));
1069 }
1070
1071 {
1079 let fields = vec![Field::new("str", DataType::Utf8, false)];
1080 arrow_fields.push(Field::new_list(
1081 "my_list",
1082 Field::new_struct("my_list_tuple", fields, false),
1083 true,
1084 ));
1085 }
1086
1087 {
1090 arrow_fields.push(Field::new_list(
1091 "name",
1092 Field::new("name", DataType::Int32, false),
1093 false,
1094 ));
1095 }
1096
1097 let parquet_group_type = parse_message_type(message_type).unwrap();
1098
1099 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1100 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1101 let converted_fields = converted_arrow_schema.fields();
1102
1103 assert_eq!(arrow_fields.len(), converted_fields.len());
1104 for i in 0..arrow_fields.len() {
1105 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1106 }
1107 }
1108
1109 #[test]
1110 fn test_parquet_list_nullable() {
1111 let mut arrow_fields = Vec::new();
1112
1113 let message_type = "
1114 message test_schema {
1115 REQUIRED GROUP my_list1 (LIST) {
1116 REPEATED GROUP list {
1117 OPTIONAL BINARY element (UTF8);
1118 }
1119 }
1120 OPTIONAL GROUP my_list2 (LIST) {
1121 REPEATED GROUP list {
1122 REQUIRED BINARY element (UTF8);
1123 }
1124 }
1125 REQUIRED GROUP my_list3 (LIST) {
1126 REPEATED GROUP list {
1127 REQUIRED BINARY element (UTF8);
1128 }
1129 }
1130 }
1131 ";
1132
1133 {
1140 arrow_fields.push(Field::new_list(
1141 "my_list1",
1142 Field::new("element", DataType::Utf8, true),
1143 false,
1144 ));
1145 }
1146
1147 {
1154 arrow_fields.push(Field::new_list(
1155 "my_list2",
1156 Field::new("element", DataType::Utf8, false),
1157 true,
1158 ));
1159 }
1160
1161 {
1168 arrow_fields.push(Field::new_list(
1169 "my_list3",
1170 Field::new("element", DataType::Utf8, false),
1171 false,
1172 ));
1173 }
1174
1175 let parquet_group_type = parse_message_type(message_type).unwrap();
1176
1177 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1178 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1179 let converted_fields = converted_arrow_schema.fields();
1180
1181 assert_eq!(arrow_fields.len(), converted_fields.len());
1182 for i in 0..arrow_fields.len() {
1183 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1184 }
1185 }
1186
1187 #[test]
1188 fn test_parquet_maps() {
1189 let mut arrow_fields = Vec::new();
1190
1191 let message_type = "
1193 message test_schema {
1194 REQUIRED group my_map1 (MAP) {
1195 REPEATED group key_value {
1196 REQUIRED binary key (UTF8);
1197 OPTIONAL int32 value;
1198 }
1199 }
1200 OPTIONAL group my_map2 (MAP) {
1201 REPEATED group map {
1202 REQUIRED binary str (UTF8);
1203 REQUIRED int32 num;
1204 }
1205 }
1206 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1207 REPEATED group map {
1208 REQUIRED binary key (UTF8);
1209 OPTIONAL int32 value;
1210 }
1211 }
1212 REQUIRED group my_map4 (MAP) {
1213 REPEATED group map {
1214 OPTIONAL binary key (UTF8);
1215 REQUIRED int32 value;
1216 }
1217 }
1218 }
1219 ";
1220
1221 {
1229 arrow_fields.push(Field::new_map(
1230 "my_map1",
1231 "key_value",
1232 Field::new("key", DataType::Utf8, false),
1233 Field::new("value", DataType::Int32, true),
1234 false,
1235 false,
1236 ));
1237 }
1238
1239 {
1247 arrow_fields.push(Field::new_map(
1248 "my_map2",
1249 "map",
1250 Field::new("str", DataType::Utf8, false),
1251 Field::new("num", DataType::Int32, false),
1252 false,
1253 true,
1254 ));
1255 }
1256
1257 {
1265 arrow_fields.push(Field::new_map(
1266 "my_map3",
1267 "map",
1268 Field::new("key", DataType::Utf8, false),
1269 Field::new("value", DataType::Int32, true),
1270 false,
1271 true,
1272 ));
1273 }
1274
1275 {
1283 arrow_fields.push(Field::new_map(
1284 "my_map4",
1285 "map",
1286 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1288 false,
1289 false,
1290 ));
1291 }
1292
1293 let parquet_group_type = parse_message_type(message_type).unwrap();
1294
1295 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1296 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1297 let converted_fields = converted_arrow_schema.fields();
1298
1299 assert_eq!(arrow_fields.len(), converted_fields.len());
1300 for i in 0..arrow_fields.len() {
1301 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1302 }
1303 }
1304
1305 #[test]
1306 fn test_nested_schema() {
1307 let mut arrow_fields = Vec::new();
1308 {
1309 let group1_fields = Fields::from(vec![
1310 Field::new("leaf1", DataType::Boolean, false),
1311 Field::new("leaf2", DataType::Int32, false),
1312 ]);
1313 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1314 arrow_fields.push(group1_struct);
1315
1316 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1317 arrow_fields.push(leaf3_field);
1318 }
1319
1320 let message_type = "
1321 message test_schema {
1322 REQUIRED GROUP group1 {
1323 REQUIRED BOOLEAN leaf1;
1324 REQUIRED INT32 leaf2;
1325 }
1326 REQUIRED INT64 leaf3;
1327 }
1328 ";
1329 let parquet_group_type = parse_message_type(message_type).unwrap();
1330
1331 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1332 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1333 let converted_fields = converted_arrow_schema.fields();
1334
1335 assert_eq!(arrow_fields.len(), converted_fields.len());
1336 for i in 0..arrow_fields.len() {
1337 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1338 }
1339 }
1340
1341 #[test]
1342 fn test_nested_schema_partial() {
1343 let mut arrow_fields = Vec::new();
1344 {
1345 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1346 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1347 arrow_fields.push(group1);
1348
1349 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1350 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1351 arrow_fields.push(group2);
1352
1353 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1354 }
1355
1356 let message_type = "
1357 message test_schema {
1358 REQUIRED GROUP group1 {
1359 REQUIRED INT64 leaf1;
1360 REQUIRED INT64 leaf2;
1361 }
1362 REQUIRED GROUP group2 {
1363 REQUIRED INT64 leaf3;
1364 REQUIRED INT64 leaf4;
1365 }
1366 REQUIRED INT64 leaf5;
1367 }
1368 ";
1369 let parquet_group_type = parse_message_type(message_type).unwrap();
1370
1371 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1381 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1382 let converted_arrow_schema =
1383 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1384 let converted_fields = converted_arrow_schema.fields();
1385
1386 assert_eq!(arrow_fields.len(), converted_fields.len());
1387 for i in 0..arrow_fields.len() {
1388 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1389 }
1390 }
1391
1392 #[test]
1393 fn test_nested_schema_partial_ordering() {
1394 let mut arrow_fields = Vec::new();
1395 {
1396 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1397 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1398 arrow_fields.push(group1);
1399
1400 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1401 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1402 arrow_fields.push(group2);
1403
1404 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1405 }
1406
1407 let message_type = "
1408 message test_schema {
1409 REQUIRED GROUP group1 {
1410 REQUIRED INT64 leaf1;
1411 REQUIRED INT64 leaf2;
1412 }
1413 REQUIRED GROUP group2 {
1414 REQUIRED INT64 leaf3;
1415 REQUIRED INT64 leaf4;
1416 }
1417 REQUIRED INT64 leaf5;
1418 }
1419 ";
1420 let parquet_group_type = parse_message_type(message_type).unwrap();
1421
1422 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1432 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1433 let converted_arrow_schema =
1434 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1435 let converted_fields = converted_arrow_schema.fields();
1436
1437 assert_eq!(arrow_fields.len(), converted_fields.len());
1438 for i in 0..arrow_fields.len() {
1439 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1440 }
1441
1442 let mask =
1443 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1444 let converted_arrow_schema =
1445 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1446 let converted_fields = converted_arrow_schema.fields();
1447
1448 assert_eq!(arrow_fields.len(), converted_fields.len());
1449 for i in 0..arrow_fields.len() {
1450 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1451 }
1452 }
1453
1454 #[test]
1455 fn test_repeated_nested_schema() {
1456 let mut arrow_fields = Vec::new();
1457 {
1458 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1459
1460 let inner_group_list = Field::new_list(
1461 "innerGroup",
1462 Field::new_struct(
1463 "innerGroup",
1464 vec![Field::new("leaf3", DataType::Int32, true)],
1465 false,
1466 ),
1467 false,
1468 );
1469
1470 let outer_group_list = Field::new_list(
1471 "outerGroup",
1472 Field::new_struct(
1473 "outerGroup",
1474 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1475 false,
1476 ),
1477 false,
1478 );
1479 arrow_fields.push(outer_group_list);
1480 }
1481
1482 let message_type = "
1483 message test_schema {
1484 OPTIONAL INT32 leaf1;
1485 REPEATED GROUP outerGroup {
1486 OPTIONAL INT32 leaf2;
1487 REPEATED GROUP innerGroup {
1488 OPTIONAL INT32 leaf3;
1489 }
1490 }
1491 }
1492 ";
1493 let parquet_group_type = parse_message_type(message_type).unwrap();
1494
1495 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1496 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1497 let converted_fields = converted_arrow_schema.fields();
1498
1499 assert_eq!(arrow_fields.len(), converted_fields.len());
1500 for i in 0..arrow_fields.len() {
1501 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1502 }
1503 }
1504
1505 #[test]
1506 fn test_column_desc_to_field() {
1507 let message_type = "
1508 message test_schema {
1509 REQUIRED BOOLEAN boolean;
1510 REQUIRED INT32 int8 (INT_8);
1511 REQUIRED INT32 uint8 (INTEGER(8,false));
1512 REQUIRED INT32 int16 (INT_16);
1513 REQUIRED INT32 uint16 (INTEGER(16,false));
1514 REQUIRED INT32 int32;
1515 REQUIRED INT64 int64;
1516 OPTIONAL DOUBLE double;
1517 OPTIONAL FLOAT float;
1518 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1519 OPTIONAL BINARY string (UTF8);
1520 REPEATED BOOLEAN bools;
1521 OPTIONAL INT32 date (DATE);
1522 OPTIONAL INT32 time_milli (TIME_MILLIS);
1523 OPTIONAL INT64 time_micro (TIME_MICROS);
1524 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1525 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1526 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1527 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1528 REPEATED INT32 int_list;
1529 REPEATED BINARY byte_list;
1530 REPEATED BINARY string_list (UTF8);
1531 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1532 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1533 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1534 }
1535 ";
1536 let parquet_group_type = parse_message_type(message_type).unwrap();
1537
1538 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1539 let converted_arrow_fields = parquet_schema
1540 .columns()
1541 .iter()
1542 .map(|c| parquet_to_arrow_field(c).unwrap())
1543 .collect::<Vec<Field>>();
1544
1545 let arrow_fields = vec![
1546 Field::new("boolean", DataType::Boolean, false),
1547 Field::new("int8", DataType::Int8, false),
1548 Field::new("uint8", DataType::UInt8, false),
1549 Field::new("int16", DataType::Int16, false),
1550 Field::new("uint16", DataType::UInt16, false),
1551 Field::new("int32", DataType::Int32, false),
1552 Field::new("int64", DataType::Int64, false),
1553 Field::new("double", DataType::Float64, true),
1554 Field::new("float", DataType::Float32, true),
1555 Field::new("float16", DataType::Float16, true),
1556 Field::new("string", DataType::Utf8, true),
1557 Field::new_list(
1558 "bools",
1559 Field::new("bools", DataType::Boolean, false),
1560 false,
1561 ),
1562 Field::new("date", DataType::Date32, true),
1563 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1564 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1565 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1566 Field::new(
1567 "ts_milli",
1568 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1569 true,
1570 ),
1571 Field::new(
1572 "ts_micro",
1573 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1574 false,
1575 ),
1576 Field::new(
1577 "ts_nano",
1578 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1579 false,
1580 ),
1581 Field::new_list(
1582 "int_list",
1583 Field::new("int_list", DataType::Int32, false),
1584 false,
1585 ),
1586 Field::new_list(
1587 "byte_list",
1588 Field::new("byte_list", DataType::Binary, false),
1589 false,
1590 ),
1591 Field::new_list(
1592 "string_list",
1593 Field::new("string_list", DataType::Utf8, false),
1594 false,
1595 ),
1596 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1597 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1598 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1599 ];
1600
1601 assert_eq!(arrow_fields, converted_arrow_fields);
1602 }
1603
1604 #[test]
1605 fn test_coerced_map_list() {
1606 let arrow_fields = vec![
1608 Field::new_list(
1609 "my_list",
1610 Field::new("item", DataType::Boolean, true),
1611 false,
1612 ),
1613 Field::new_map(
1614 "my_map",
1615 "entries",
1616 Field::new("keys", DataType::Utf8, false),
1617 Field::new("values", DataType::Int32, true),
1618 false,
1619 true,
1620 ),
1621 ];
1622 let arrow_schema = Schema::new(arrow_fields);
1623
1624 let message_type = "
1626 message parquet_schema {
1627 REQUIRED GROUP my_list (LIST) {
1628 REPEATED GROUP list {
1629 OPTIONAL BOOLEAN element;
1630 }
1631 }
1632 OPTIONAL GROUP my_map (MAP) {
1633 REPEATED GROUP key_value {
1634 REQUIRED BINARY key (STRING);
1635 OPTIONAL INT32 value;
1636 }
1637 }
1638 }
1639 ";
1640 let parquet_group_type = parse_message_type(message_type).unwrap();
1641 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1642 let converted_arrow_schema = ArrowSchemaConverter::new()
1643 .with_coerce_types(true)
1644 .convert(&arrow_schema)
1645 .unwrap();
1646 assert_eq!(
1647 parquet_schema.columns().len(),
1648 converted_arrow_schema.columns().len()
1649 );
1650
1651 let message_type = "
1653 message parquet_schema {
1654 REQUIRED GROUP my_list (LIST) {
1655 REPEATED GROUP list {
1656 OPTIONAL BOOLEAN item;
1657 }
1658 }
1659 OPTIONAL GROUP my_map (MAP) {
1660 REPEATED GROUP entries {
1661 REQUIRED BINARY keys (STRING);
1662 OPTIONAL INT32 values;
1663 }
1664 }
1665 }
1666 ";
1667 let parquet_group_type = parse_message_type(message_type).unwrap();
1668 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1669 let converted_arrow_schema = ArrowSchemaConverter::new()
1670 .with_coerce_types(false)
1671 .convert(&arrow_schema)
1672 .unwrap();
1673 assert_eq!(
1674 parquet_schema.columns().len(),
1675 converted_arrow_schema.columns().len()
1676 );
1677 }
1678
1679 #[test]
1680 fn test_field_to_column_desc() {
1681 let message_type = "
1682 message arrow_schema {
1683 REQUIRED BOOLEAN boolean;
1684 REQUIRED INT32 int8 (INT_8);
1685 REQUIRED INT32 int16 (INTEGER(16,true));
1686 REQUIRED INT32 int32;
1687 REQUIRED INT64 int64;
1688 OPTIONAL DOUBLE double;
1689 OPTIONAL FLOAT float;
1690 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1691 OPTIONAL BINARY string (STRING);
1692 OPTIONAL GROUP bools (LIST) {
1693 REPEATED GROUP list {
1694 OPTIONAL BOOLEAN element;
1695 }
1696 }
1697 REQUIRED GROUP bools_non_null (LIST) {
1698 REPEATED GROUP list {
1699 REQUIRED BOOLEAN element;
1700 }
1701 }
1702 OPTIONAL INT32 date (DATE);
1703 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1704 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1705 OPTIONAL INT64 time_micro (TIME_MICROS);
1706 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1707 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1708 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1709 REQUIRED INT64 ts_seconds;
1710 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1711 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1712 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1713 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1714 REQUIRED GROUP struct {
1715 REQUIRED BOOLEAN bools;
1716 REQUIRED INT32 uint32 (INTEGER(32,false));
1717 REQUIRED GROUP int32 (LIST) {
1718 REPEATED GROUP list {
1719 OPTIONAL INT32 element;
1720 }
1721 }
1722 }
1723 REQUIRED BINARY dictionary_strings (STRING);
1724 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1725 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1726 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1727 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1728 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1729 }
1730 ";
1731 let parquet_group_type = parse_message_type(message_type).unwrap();
1732
1733 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1734
1735 let arrow_fields = vec![
1736 Field::new("boolean", DataType::Boolean, false),
1737 Field::new("int8", DataType::Int8, false),
1738 Field::new("int16", DataType::Int16, false),
1739 Field::new("int32", DataType::Int32, false),
1740 Field::new("int64", DataType::Int64, false),
1741 Field::new("double", DataType::Float64, true),
1742 Field::new("float", DataType::Float32, true),
1743 Field::new("float16", DataType::Float16, true),
1744 Field::new("string", DataType::Utf8, true),
1745 Field::new_list(
1746 "bools",
1747 Field::new("element", DataType::Boolean, true),
1748 true,
1749 ),
1750 Field::new_list(
1751 "bools_non_null",
1752 Field::new("element", DataType::Boolean, false),
1753 false,
1754 ),
1755 Field::new("date", DataType::Date32, true),
1756 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1757 Field::new(
1758 "time_milli_utc",
1759 DataType::Time32(TimeUnit::Millisecond),
1760 true,
1761 )
1762 .with_metadata(HashMap::from_iter(vec![(
1763 "adjusted_to_utc".to_string(),
1764 "".to_string(),
1765 )])),
1766 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1767 Field::new(
1768 "time_micro_utc",
1769 DataType::Time64(TimeUnit::Microsecond),
1770 true,
1771 )
1772 .with_metadata(HashMap::from_iter(vec![(
1773 "adjusted_to_utc".to_string(),
1774 "".to_string(),
1775 )])),
1776 Field::new(
1777 "ts_milli",
1778 DataType::Timestamp(TimeUnit::Millisecond, None),
1779 true,
1780 ),
1781 Field::new(
1782 "ts_micro",
1783 DataType::Timestamp(TimeUnit::Microsecond, None),
1784 false,
1785 ),
1786 Field::new(
1787 "ts_seconds",
1788 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1789 false,
1790 ),
1791 Field::new(
1792 "ts_micro_utc",
1793 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1794 false,
1795 ),
1796 Field::new(
1797 "ts_millis_zero_offset",
1798 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1799 false,
1800 ),
1801 Field::new(
1802 "ts_millis_zero_negative_offset",
1803 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1804 false,
1805 ),
1806 Field::new(
1807 "ts_micro_non_utc",
1808 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1809 false,
1810 ),
1811 Field::new_struct(
1812 "struct",
1813 vec![
1814 Field::new("bools", DataType::Boolean, false),
1815 Field::new("uint32", DataType::UInt32, false),
1816 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1817 ],
1818 false,
1819 ),
1820 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1821 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1822 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1823 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1824 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1825 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1826 ];
1827 let arrow_schema = Schema::new(arrow_fields);
1828 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1829
1830 assert_eq!(
1831 parquet_schema.columns().len(),
1832 converted_arrow_schema.columns().len()
1833 );
1834 parquet_schema
1835 .columns()
1836 .iter()
1837 .zip(converted_arrow_schema.columns())
1838 .for_each(|(a, b)| {
1839 match a.logical_type() {
1844 Some(_) => {
1845 assert_eq!(a, b)
1846 }
1847 None => {
1848 assert_eq!(a.name(), b.name());
1849 assert_eq!(a.physical_type(), b.physical_type());
1850 assert_eq!(a.converted_type(), b.converted_type());
1851 }
1852 };
1853 });
1854 }
1855
1856 #[test]
1857 #[should_panic(expected = "Parquet does not support writing empty structs")]
1858 fn test_empty_struct_field() {
1859 let arrow_fields = vec![Field::new(
1860 "struct",
1861 DataType::Struct(Fields::empty()),
1862 false,
1863 )];
1864 let arrow_schema = Schema::new(arrow_fields);
1865 let converted_arrow_schema = ArrowSchemaConverter::new()
1866 .with_coerce_types(true)
1867 .convert(&arrow_schema);
1868
1869 converted_arrow_schema.unwrap();
1870 }
1871
1872 #[test]
1873 fn test_metadata() {
1874 let message_type = "
1875 message test_schema {
1876 OPTIONAL BINARY string (STRING);
1877 }
1878 ";
1879 let parquet_group_type = parse_message_type(message_type).unwrap();
1880
1881 let key_value_metadata = vec![
1882 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1883 KeyValue::new("baz".to_owned(), None),
1884 ];
1885
1886 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1887 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1888
1889 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1890 let converted_arrow_schema =
1891 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1892
1893 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1894 }
1895
1896 #[test]
1897 fn test_arrow_schema_roundtrip() -> Result<()> {
1898 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1899 a.iter()
1900 .map(|(a, b)| (a.to_string(), b.to_string()))
1901 .collect()
1902 };
1903
1904 let schema = Schema::new_with_metadata(
1905 vec![
1906 Field::new("c1", DataType::Utf8, false)
1907 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1908 Field::new("c2", DataType::Binary, false),
1909 Field::new("c3", DataType::FixedSizeBinary(3), false),
1910 Field::new("c4", DataType::Boolean, false),
1911 Field::new("c5", DataType::Date32, false),
1912 Field::new("c6", DataType::Date64, false),
1913 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1914 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1915 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1916 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1917 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1918 Field::new(
1919 "c16",
1920 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1921 false,
1922 ),
1923 Field::new(
1924 "c17",
1925 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1926 false,
1927 ),
1928 Field::new(
1929 "c18",
1930 DataType::Timestamp(TimeUnit::Nanosecond, None),
1931 false,
1932 ),
1933 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1934 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1935 Field::new_list(
1936 "c21",
1937 Field::new_list_field(DataType::Boolean, true)
1938 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1939 false,
1940 )
1941 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1942 Field::new(
1943 "c22",
1944 DataType::FixedSizeList(
1945 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1946 5,
1947 ),
1948 false,
1949 ),
1950 Field::new_list(
1951 "c23",
1952 Field::new_large_list(
1953 "inner",
1954 Field::new_list_field(
1955 DataType::Struct(
1956 vec![
1957 Field::new("a", DataType::Int16, true),
1958 Field::new("b", DataType::Float64, false),
1959 Field::new("c", DataType::Float32, false),
1960 Field::new("d", DataType::Float16, false),
1961 ]
1962 .into(),
1963 ),
1964 false,
1965 ),
1966 true,
1967 ),
1968 false,
1969 ),
1970 Field::new(
1971 "c24",
1972 DataType::Struct(Fields::from(vec![
1973 Field::new("a", DataType::Utf8, false),
1974 Field::new("b", DataType::UInt16, false),
1975 ])),
1976 false,
1977 ),
1978 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1979 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1980 #[allow(deprecated)]
1986 Field::new_dict(
1987 "c31",
1988 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1989 true,
1990 123,
1991 true,
1992 )
1993 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
1994 Field::new("c32", DataType::LargeBinary, true),
1995 Field::new("c33", DataType::LargeUtf8, true),
1996 Field::new_large_list(
1997 "c34",
1998 Field::new_list(
1999 "inner",
2000 Field::new_list_field(
2001 DataType::Struct(
2002 vec![
2003 Field::new("a", DataType::Int16, true),
2004 Field::new("b", DataType::Float64, true),
2005 ]
2006 .into(),
2007 ),
2008 true,
2009 ),
2010 true,
2011 ),
2012 true,
2013 ),
2014 Field::new("c35", DataType::Null, true),
2015 Field::new("c36", DataType::Decimal128(2, 1), false),
2016 Field::new("c37", DataType::Decimal256(50, 20), false),
2017 Field::new("c38", DataType::Decimal128(18, 12), true),
2018 Field::new_map(
2019 "c39",
2020 "key_value",
2021 Field::new("key", DataType::Utf8, false),
2022 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2023 false, true,
2025 ),
2026 Field::new_map(
2027 "c40",
2028 "my_entries",
2029 Field::new("my_key", DataType::Utf8, false)
2030 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2031 Field::new_list(
2032 "my_value",
2033 Field::new_list_field(DataType::Utf8, true)
2034 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2035 true,
2036 )
2037 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2038 false, true,
2040 )
2041 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2042 Field::new_map(
2043 "c41",
2044 "my_entries",
2045 Field::new("my_key", DataType::Utf8, false),
2046 Field::new_list(
2047 "my_value",
2048 Field::new_list_field(DataType::Utf8, true)
2049 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2050 true,
2051 ),
2052 false, false,
2054 ),
2055 Field::new("c42", DataType::Decimal32(5, 2), false),
2056 Field::new("c43", DataType::Decimal64(18, 12), true),
2057 ],
2058 meta(&[("Key", "Value")]),
2059 );
2060
2061 let file = tempfile::tempfile().unwrap();
2063 let writer =
2064 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2065 writer.close()?;
2066
2067 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2069
2070 let read_schema = arrow_reader.schema();
2072 assert_eq!(&schema, read_schema.as_ref());
2073
2074 let mut stack = Vec::with_capacity(10);
2076 let mut out = Vec::with_capacity(10);
2077
2078 let root = arrow_reader.parquet_schema().root_schema_ptr();
2079 stack.push((root.name().to_string(), root));
2080
2081 while let Some((p, t)) = stack.pop() {
2082 if t.is_group() {
2083 for f in t.get_fields() {
2084 stack.push((format!("{p}.{}", f.name()), f.clone()))
2085 }
2086 }
2087
2088 let info = t.get_basic_info();
2089 if info.has_id() {
2090 out.push(format!("{p} -> {}", info.id()))
2091 }
2092 }
2093 out.sort_unstable();
2094 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2095
2096 assert_eq!(
2097 &out,
2098 &[
2099 "arrow_schema.c1 -> 2",
2100 "arrow_schema.c21 -> 4",
2101 "arrow_schema.c21.list.item -> 5",
2102 "arrow_schema.c31 -> 6",
2103 "arrow_schema.c40 -> 7",
2104 "arrow_schema.c40.my_entries.my_key -> 8",
2105 "arrow_schema.c40.my_entries.my_value -> 9",
2106 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2107 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2108 ]
2109 );
2110
2111 Ok(())
2112 }
2113
2114 #[test]
2115 fn test_read_parquet_field_ids_raw() -> Result<()> {
2116 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2117 a.iter()
2118 .map(|(a, b)| (a.to_string(), b.to_string()))
2119 .collect()
2120 };
2121 let schema = Schema::new_with_metadata(
2122 vec![
2123 Field::new("c1", DataType::Utf8, true)
2124 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2125 Field::new("c2", DataType::Utf8, true)
2126 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2127 ],
2128 HashMap::new(),
2129 );
2130
2131 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2132 let parquet_bytes = writer.into_inner()?;
2133
2134 let reader =
2135 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2136 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2137
2138 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2140
2141 let parq_schema_descr = ArrowSchemaConverter::new()
2142 .with_coerce_types(true)
2143 .convert(&arrow_schema)?;
2144 let parq_fields = parq_schema_descr.root_schema().get_fields();
2145 assert_eq!(parq_fields.len(), 2);
2146 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2147 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2148
2149 Ok(())
2150 }
2151
2152 #[test]
2153 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2154 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2155 .iter()
2156 .cloned()
2157 .collect();
2158
2159 let schema = Schema::new_with_metadata(
2160 vec![
2161 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2162 Field::new(
2163 "c22",
2164 DataType::FixedSizeList(
2165 Arc::new(Field::new("items", DataType::Boolean, false)),
2166 5,
2167 ),
2168 false,
2169 ),
2170 Field::new_list(
2171 "c23",
2172 Field::new_large_list(
2173 "items",
2174 Field::new_struct(
2175 "items",
2176 vec![
2177 Field::new("a", DataType::Int16, true),
2178 Field::new("b", DataType::Float64, false),
2179 ],
2180 true,
2181 ),
2182 true,
2183 ),
2184 true,
2185 ),
2186 ],
2187 metadata,
2188 );
2189
2190 let file = tempfile::tempfile().unwrap();
2192 let writer =
2193 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2194 writer.close()?;
2195
2196 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2198 let read_schema = arrow_reader.schema();
2199 assert_eq!(&schema, read_schema.as_ref());
2200 Ok(())
2201 }
2202
2203 #[test]
2204 fn test_get_arrow_schema_from_metadata() {
2205 assert!(get_arrow_schema_from_metadata("").is_err());
2206 }
2207
2208 #[test]
2209 #[cfg(feature = "arrow_canonical_extension_types")]
2210 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2211 use arrow_schema::extension::Uuid;
2212 let arrow_schema = Schema::new(vec![
2213 Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2214 ]);
2215
2216 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2217
2218 assert_eq!(
2219 parquet_schema.column(0).logical_type(),
2220 Some(LogicalType::Uuid)
2221 );
2222
2223 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2224 assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2225
2226 Ok(())
2227 }
2228
2229 #[test]
2230 #[cfg(feature = "arrow_canonical_extension_types")]
2231 fn arrow_json_to_parquet_json() -> Result<()> {
2232 use arrow_schema::extension::Json;
2233 let arrow_schema = Schema::new(vec![
2234 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2235 ]);
2236
2237 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239 assert_eq!(
2240 parquet_schema.column(0).logical_type(),
2241 Some(LogicalType::Json)
2242 );
2243
2244 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2245 assert_eq!(
2246 arrow_schema.field(0).try_extension_type::<Json>()?,
2247 Json::default()
2248 );
2249
2250 Ok(())
2251 }
2252}