1use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38
39use super::PARQUET_FIELD_ID_META_KEY;
40use crate::arrow::schema::extension::{
41 has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
42 logical_type_for_struct, try_add_extension_type,
43};
44use crate::arrow::ProjectionMask;
45pub(crate) use complex::{ParquetField, ParquetFieldType};
46
47pub fn parquet_to_arrow_schema(
52 parquet_schema: &SchemaDescriptor,
53 key_value_metadata: Option<&Vec<KeyValue>>,
54) -> Result<Schema> {
55 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
56}
57
58pub fn parquet_to_arrow_schema_by_columns(
61 parquet_schema: &SchemaDescriptor,
62 mask: ProjectionMask,
63 key_value_metadata: Option<&Vec<KeyValue>>,
64) -> Result<Schema> {
65 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
66}
67
68pub(crate) fn parquet_to_arrow_schema_and_fields(
74 parquet_schema: &SchemaDescriptor,
75 mask: ProjectionMask,
76 key_value_metadata: Option<&Vec<KeyValue>>,
77) -> Result<(Schema, Option<ParquetField>)> {
78 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
79 let maybe_schema = metadata
80 .remove(super::ARROW_SCHEMA_META_KEY)
81 .map(|value| get_arrow_schema_from_metadata(&value))
82 .transpose()?;
83
84 if let Some(arrow_schema) = &maybe_schema {
86 arrow_schema.metadata().iter().for_each(|(k, v)| {
87 metadata.entry(k.clone()).or_insert_with(|| v.clone());
88 });
89 }
90
91 let hint = maybe_schema.as_ref().map(|s| s.fields());
92 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
93 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
94 Ok((schema, field_levels.levels))
95}
96
97#[derive(Debug, Clone)]
105pub struct FieldLevels {
106 pub(crate) fields: Fields,
107 pub(crate) levels: Option<ParquetField>,
108}
109
110pub fn parquet_to_arrow_field_levels(
131 schema: &SchemaDescriptor,
132 mask: ProjectionMask,
133 hint: Option<&Fields>,
134) -> Result<FieldLevels> {
135 match complex::convert_schema(schema, mask, hint)? {
136 Some(field) => match &field.arrow_type {
137 DataType::Struct(fields) => Ok(FieldLevels {
138 fields: fields.clone(),
139 levels: Some(field),
140 }),
141 _ => unreachable!(),
142 },
143 None => Ok(FieldLevels {
144 fields: Fields::empty(),
145 levels: None,
146 }),
147 }
148}
149
150fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
152 let decoded = BASE64_STANDARD.decode(encoded_meta);
153 match decoded {
154 Ok(bytes) => {
155 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
156 &bytes[8..]
157 } else {
158 bytes.as_slice()
159 };
160 match arrow_ipc::root_as_message(slice) {
161 Ok(message) => message
162 .header_as_schema()
163 .map(arrow_ipc::convert::fb_to_schema)
164 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
165 Err(err) => {
166 Err(arrow_err!(
168 "Unable to get root as message stored in {}: {:?}",
169 super::ARROW_SCHEMA_META_KEY,
170 err
171 ))
172 }
173 }
174 }
175 Err(err) => {
176 Err(arrow_err!(
178 "Unable to decode the encoded schema stored in {}, {:?}",
179 super::ARROW_SCHEMA_META_KEY,
180 err
181 ))
182 }
183 }
184}
185
186pub fn encode_arrow_schema(schema: &Schema) -> String {
188 let options = writer::IpcWriteOptions::default();
189 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
190 let data_gen = writer::IpcDataGenerator::default();
191 let mut serialized_schema =
192 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
193
194 let schema_len = serialized_schema.ipc_message.len();
197 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200 len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202 BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
212 let encoded = encode_arrow_schema(schema);
213
214 let schema_kv = KeyValue {
215 key: super::ARROW_SCHEMA_META_KEY.to_string(),
216 value: Some(encoded),
217 };
218
219 let meta = props
220 .key_value_metadata
221 .get_or_insert_with(Default::default);
222
223 let schema_meta = meta
225 .iter()
226 .enumerate()
227 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
228 match schema_meta {
229 Some((i, _)) => {
230 meta.remove(i);
231 meta.push(schema_kv);
232 }
233 None => {
234 meta.push(schema_kv);
235 }
236 }
237}
238
239#[derive(Debug)]
284pub struct ArrowSchemaConverter<'a> {
285 schema_root: &'a str,
287 coerce_types: bool,
291}
292
293impl Default for ArrowSchemaConverter<'_> {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299impl<'a> ArrowSchemaConverter<'a> {
300 pub fn new() -> Self {
302 Self {
303 schema_root: "arrow_schema",
304 coerce_types: false,
305 }
306 }
307
308 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
339 self.coerce_types = coerce_types;
340 self
341 }
342
343 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
345 self.schema_root = schema_root;
346 self
347 }
348
349 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
353 let fields = schema
354 .fields()
355 .iter()
356 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
357 .collect::<Result<_>>()?;
358 let group = Type::group_type_builder(self.schema_root)
359 .with_fields(fields)
360 .build()?;
361 Ok(SchemaDescriptor::new(Arc::new(group)))
362 }
363}
364
365fn parse_key_value_metadata(
366 key_value_metadata: Option<&Vec<KeyValue>>,
367) -> Option<HashMap<String, String>> {
368 match key_value_metadata {
369 Some(key_values) => {
370 let map: HashMap<String, String> = key_values
371 .iter()
372 .filter_map(|kv| {
373 kv.value
374 .as_ref()
375 .map(|value| (kv.key.clone(), value.clone()))
376 })
377 .collect();
378
379 if map.is_empty() {
380 None
381 } else {
382 Some(map)
383 }
384 }
385 None => None,
386 }
387}
388
389pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
391 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
392 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
393
394 let parquet_type = parquet_column.self_type();
395 let basic_info = parquet_type.get_basic_info();
396
397 let mut hash_map_size = 0;
398 if basic_info.has_id() {
399 hash_map_size += 1;
400 }
401 if has_extension_type(parquet_type) {
402 hash_map_size += 1;
403 }
404 if hash_map_size == 0 {
405 return Ok(ret);
406 }
407 ret.set_metadata(HashMap::with_capacity(hash_map_size));
408 if basic_info.has_id() {
409 ret.metadata_mut().insert(
410 PARQUET_FIELD_ID_META_KEY.to_string(),
411 basic_info.id().to_string(),
412 );
413 }
414 try_add_extension_type(ret, parquet_column.self_type())
415}
416
417pub fn decimal_length_from_precision(precision: u8) -> usize {
418 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
426}
427
428fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
430 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
431 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
432 const PARQUET_KEY_FIELD_NAME: &str = "key";
433 const PARQUET_VALUE_FIELD_NAME: &str = "value";
434
435 let name = field.name().as_str();
436 let repetition = if field.is_nullable() {
437 Repetition::OPTIONAL
438 } else {
439 Repetition::REQUIRED
440 };
441 let id = field_id(field);
442 match field.data_type() {
444 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
445 .with_logical_type(Some(LogicalType::Unknown))
446 .with_repetition(repetition)
447 .with_id(id)
448 .build(),
449 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
450 .with_repetition(repetition)
451 .with_id(id)
452 .build(),
453 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
454 .with_logical_type(Some(LogicalType::Integer {
455 bit_width: 8,
456 is_signed: true,
457 }))
458 .with_repetition(repetition)
459 .with_id(id)
460 .build(),
461 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
462 .with_logical_type(Some(LogicalType::Integer {
463 bit_width: 16,
464 is_signed: true,
465 }))
466 .with_repetition(repetition)
467 .with_id(id)
468 .build(),
469 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
470 .with_repetition(repetition)
471 .with_id(id)
472 .build(),
473 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
474 .with_repetition(repetition)
475 .with_id(id)
476 .build(),
477 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
478 .with_logical_type(Some(LogicalType::Integer {
479 bit_width: 8,
480 is_signed: false,
481 }))
482 .with_repetition(repetition)
483 .with_id(id)
484 .build(),
485 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
486 .with_logical_type(Some(LogicalType::Integer {
487 bit_width: 16,
488 is_signed: false,
489 }))
490 .with_repetition(repetition)
491 .with_id(id)
492 .build(),
493 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
494 .with_logical_type(Some(LogicalType::Integer {
495 bit_width: 32,
496 is_signed: false,
497 }))
498 .with_repetition(repetition)
499 .with_id(id)
500 .build(),
501 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
502 .with_logical_type(Some(LogicalType::Integer {
503 bit_width: 64,
504 is_signed: false,
505 }))
506 .with_repetition(repetition)
507 .with_id(id)
508 .build(),
509 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
510 .with_repetition(repetition)
511 .with_id(id)
512 .with_logical_type(Some(LogicalType::Float16))
513 .with_length(2)
514 .build(),
515 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
516 .with_repetition(repetition)
517 .with_id(id)
518 .build(),
519 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
520 .with_repetition(repetition)
521 .with_id(id)
522 .build(),
523 DataType::Timestamp(TimeUnit::Second, _) => {
524 Type::primitive_type_builder(name, PhysicalType::INT64)
526 .with_repetition(repetition)
527 .with_id(id)
528 .build()
529 }
530 DataType::Timestamp(time_unit, tz) => {
531 Type::primitive_type_builder(name, PhysicalType::INT64)
532 .with_logical_type(Some(LogicalType::Timestamp {
533 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
535 unit: match time_unit {
536 TimeUnit::Second => unreachable!(),
537 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
538 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
539 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
540 },
541 }))
542 .with_repetition(repetition)
543 .with_id(id)
544 .build()
545 }
546 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
547 .with_logical_type(Some(LogicalType::Date))
548 .with_repetition(repetition)
549 .with_id(id)
550 .build(),
551 DataType::Date64 => {
552 if coerce_types {
553 Type::primitive_type_builder(name, PhysicalType::INT32)
554 .with_logical_type(Some(LogicalType::Date))
555 .with_repetition(repetition)
556 .with_id(id)
557 .build()
558 } else {
559 Type::primitive_type_builder(name, PhysicalType::INT64)
560 .with_repetition(repetition)
561 .with_id(id)
562 .build()
563 }
564 }
565 DataType::Time32(TimeUnit::Second) => {
566 Type::primitive_type_builder(name, PhysicalType::INT32)
568 .with_repetition(repetition)
569 .with_id(id)
570 .build()
571 }
572 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
573 .with_logical_type(Some(LogicalType::Time {
574 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
575 unit: match unit {
576 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
577 u => unreachable!("Invalid unit for Time32: {:?}", u),
578 },
579 }))
580 .with_repetition(repetition)
581 .with_id(id)
582 .build(),
583 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
584 .with_logical_type(Some(LogicalType::Time {
585 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
586 unit: match unit {
587 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
588 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
589 u => unreachable!("Invalid unit for Time64: {:?}", u),
590 },
591 }))
592 .with_repetition(repetition)
593 .with_id(id)
594 .build(),
595 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
596 .with_repetition(repetition)
597 .with_id(id)
598 .build(),
599 DataType::Interval(_) => {
600 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
601 .with_converted_type(ConvertedType::INTERVAL)
602 .with_repetition(repetition)
603 .with_id(id)
604 .with_length(12)
605 .build()
606 }
607 DataType::Binary | DataType::LargeBinary => {
608 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
609 .with_repetition(repetition)
610 .with_id(id)
611 .build()
612 }
613 DataType::FixedSizeBinary(length) => {
614 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615 .with_repetition(repetition)
616 .with_id(id)
617 .with_length(*length)
618 .with_logical_type(logical_type_for_fixed_size_binary(field))
619 .build()
620 }
621 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
622 .with_repetition(repetition)
623 .with_id(id)
624 .build(),
625 DataType::Decimal32(precision, scale)
626 | DataType::Decimal64(precision, scale)
627 | DataType::Decimal128(precision, scale)
628 | DataType::Decimal256(precision, scale) => {
629 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
632 (PhysicalType::INT32, -1)
633 } else if *precision <= 18 {
634 (PhysicalType::INT64, -1)
635 } else {
636 (
637 PhysicalType::FIXED_LEN_BYTE_ARRAY,
638 decimal_length_from_precision(*precision) as i32,
639 )
640 };
641 Type::primitive_type_builder(name, physical_type)
642 .with_repetition(repetition)
643 .with_id(id)
644 .with_length(length)
645 .with_logical_type(Some(LogicalType::Decimal {
646 scale: *scale as i32,
647 precision: *precision as i32,
648 }))
649 .with_precision(*precision as i32)
650 .with_scale(*scale as i32)
651 .build()
652 }
653 DataType::Utf8 | DataType::LargeUtf8 => {
654 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
655 .with_logical_type(logical_type_for_string(field))
656 .with_repetition(repetition)
657 .with_id(id)
658 .build()
659 }
660 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
661 .with_logical_type(logical_type_for_string(field))
662 .with_repetition(repetition)
663 .with_id(id)
664 .build(),
665 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
666 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
667 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
669 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
670 } else {
671 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
672 };
673
674 Type::group_type_builder(name)
675 .with_fields(vec![Arc::new(
676 Type::group_type_builder("list")
677 .with_fields(vec![field_ref])
678 .with_repetition(Repetition::REPEATED)
679 .build()?,
680 )])
681 .with_logical_type(Some(LogicalType::List))
682 .with_repetition(repetition)
683 .with_id(id)
684 .build()
685 }
686 DataType::ListView(_) | DataType::LargeListView(_) => {
687 unimplemented!("ListView/LargeListView not implemented")
688 }
689 DataType::Struct(fields) => {
690 if fields.is_empty() {
691 return Err(arrow_err!("Parquet does not support writing empty structs",));
692 }
693 let fields = fields
695 .iter()
696 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
697 .collect::<Result<_>>()?;
698 Type::group_type_builder(name)
699 .with_fields(fields)
700 .with_repetition(repetition)
701 .with_id(id)
702 .with_logical_type(logical_type_for_struct(field))
703 .build()
704 }
705 DataType::Map(field, _) => {
706 if let DataType::Struct(struct_fields) = field.data_type() {
707 let map_struct_name = if coerce_types {
709 PARQUET_MAP_STRUCT_NAME
710 } else {
711 field.name()
712 };
713
714 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
716 if coerce_types && fld.name() != name {
717 let f = fld.as_ref().clone().with_name(name);
718 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
719 } else {
720 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
721 }
722 };
723 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
724 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
725
726 Type::group_type_builder(name)
727 .with_fields(vec![Arc::new(
728 Type::group_type_builder(map_struct_name)
729 .with_fields(vec![key_field, val_field])
730 .with_repetition(Repetition::REPEATED)
731 .build()?,
732 )])
733 .with_logical_type(Some(LogicalType::Map))
734 .with_repetition(repetition)
735 .with_id(id)
736 .build()
737 } else {
738 Err(arrow_err!(
739 "DataType::Map should contain a struct field child",
740 ))
741 }
742 }
743 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
744 DataType::Dictionary(_, ref value) => {
745 let dict_field = field.clone().with_data_type(value.as_ref().clone());
747 arrow_to_parquet_type(&dict_field, coerce_types)
748 }
749 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
750 "Converting RunEndEncodedType to parquet not supported",
751 )),
752 }
753}
754
755fn field_id(field: &Field) -> Option<i32> {
756 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
757 value.parse().ok() }
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763
764 use std::{collections::HashMap, sync::Arc};
765
766 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
767 use crate::file::metadata::KeyValue;
768 use crate::file::reader::FileReader;
769 use crate::{
770 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
771 schema::{parser::parse_message_type, types::SchemaDescriptor},
772 };
773 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
774
775 #[test]
776 fn test_flat_primitives() {
777 let message_type = "
778 message test_schema {
779 REQUIRED BOOLEAN boolean;
780 REQUIRED INT32 int8 (INT_8);
781 REQUIRED INT32 int16 (INT_16);
782 REQUIRED INT32 uint8 (INTEGER(8,false));
783 REQUIRED INT32 uint16 (INTEGER(16,false));
784 REQUIRED INT32 int32;
785 REQUIRED INT64 int64;
786 OPTIONAL DOUBLE double;
787 OPTIONAL FLOAT float;
788 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
789 OPTIONAL BINARY string (UTF8);
790 OPTIONAL BINARY string_2 (STRING);
791 OPTIONAL BINARY json (JSON);
792 }
793 ";
794 let parquet_group_type = parse_message_type(message_type).unwrap();
795
796 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
797 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
798
799 let arrow_fields = Fields::from(vec![
800 Field::new("boolean", DataType::Boolean, false),
801 Field::new("int8", DataType::Int8, false),
802 Field::new("int16", DataType::Int16, false),
803 Field::new("uint8", DataType::UInt8, false),
804 Field::new("uint16", DataType::UInt16, false),
805 Field::new("int32", DataType::Int32, false),
806 Field::new("int64", DataType::Int64, false),
807 Field::new("double", DataType::Float64, true),
808 Field::new("float", DataType::Float32, true),
809 Field::new("float16", DataType::Float16, true),
810 Field::new("string", DataType::Utf8, true),
811 Field::new("string_2", DataType::Utf8, true),
812 json_field(),
813 ]);
814
815 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
816 }
817
818 fn json_field() -> Field {
821 #[cfg(feature = "arrow_canonical_extension_types")]
822 {
823 Field::new("json", DataType::Utf8, true)
824 .with_extension_type(arrow_schema::extension::Json::default())
825 }
826 #[cfg(not(feature = "arrow_canonical_extension_types"))]
827 {
828 Field::new("json", DataType::Utf8, true)
829 }
830 }
831
832 #[test]
833 fn test_decimal_fields() {
834 let message_type = "
835 message test_schema {
836 REQUIRED INT32 decimal1 (DECIMAL(4,2));
837 REQUIRED INT64 decimal2 (DECIMAL(12,2));
838 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
839 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
840 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
841 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
842 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
843 }
844 ";
845
846 let parquet_group_type = parse_message_type(message_type).unwrap();
847
848 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
849 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
850
851 let arrow_fields = Fields::from(vec![
852 Field::new("decimal1", DataType::Decimal128(4, 2), false),
853 Field::new("decimal2", DataType::Decimal128(12, 2), false),
854 Field::new("decimal3", DataType::Decimal128(30, 2), false),
855 Field::new("decimal4", DataType::Decimal128(33, 2), false),
856 Field::new("decimal5", DataType::Decimal128(38, 2), false),
857 Field::new("decimal6", DataType::Decimal256(39, 2), false),
858 Field::new("decimal7", DataType::Decimal256(39, 2), false),
859 ]);
860 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
861 }
862
863 #[test]
864 fn test_byte_array_fields() {
865 let message_type = "
866 message test_schema {
867 REQUIRED BYTE_ARRAY binary;
868 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
869 }
870 ";
871
872 let parquet_group_type = parse_message_type(message_type).unwrap();
873
874 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
875 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
876
877 let arrow_fields = Fields::from(vec![
878 Field::new("binary", DataType::Binary, false),
879 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
880 ]);
881 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
882 }
883
884 #[test]
885 fn test_duplicate_fields() {
886 let message_type = "
887 message test_schema {
888 REQUIRED BOOLEAN boolean;
889 REQUIRED INT32 int8 (INT_8);
890 }
891 ";
892
893 let parquet_group_type = parse_message_type(message_type).unwrap();
894
895 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
896 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
897
898 let arrow_fields = Fields::from(vec![
899 Field::new("boolean", DataType::Boolean, false),
900 Field::new("int8", DataType::Int8, false),
901 ]);
902 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
903
904 let converted_arrow_schema =
905 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
906 .unwrap();
907 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
908 }
909
910 #[test]
911 fn test_parquet_lists() {
912 let mut arrow_fields = Vec::new();
913
914 let message_type = "
916 message test_schema {
917 REQUIRED GROUP my_list (LIST) {
918 REPEATED GROUP list {
919 OPTIONAL BINARY element (UTF8);
920 }
921 }
922 OPTIONAL GROUP my_list (LIST) {
923 REPEATED GROUP list {
924 REQUIRED BINARY element (UTF8);
925 }
926 }
927 OPTIONAL GROUP array_of_arrays (LIST) {
928 REPEATED GROUP list {
929 REQUIRED GROUP element (LIST) {
930 REPEATED GROUP list {
931 REQUIRED INT32 element;
932 }
933 }
934 }
935 }
936 OPTIONAL GROUP my_list (LIST) {
937 REPEATED GROUP element {
938 REQUIRED BINARY str (UTF8);
939 }
940 }
941 OPTIONAL GROUP my_list (LIST) {
942 REPEATED INT32 element;
943 }
944 OPTIONAL GROUP my_list (LIST) {
945 REPEATED GROUP element {
946 REQUIRED BINARY str (UTF8);
947 REQUIRED INT32 num;
948 }
949 }
950 OPTIONAL GROUP my_list (LIST) {
951 REPEATED GROUP array {
952 REQUIRED BINARY str (UTF8);
953 }
954
955 }
956 OPTIONAL GROUP my_list (LIST) {
957 REPEATED GROUP my_list_tuple {
958 REQUIRED BINARY str (UTF8);
959 }
960 }
961 REPEATED INT32 name;
962 }
963 ";
964
965 {
972 arrow_fields.push(Field::new_list(
973 "my_list",
974 Field::new("element", DataType::Utf8, true),
975 false,
976 ));
977 }
978
979 {
986 arrow_fields.push(Field::new_list(
987 "my_list",
988 Field::new("element", DataType::Utf8, false),
989 true,
990 ));
991 }
992
993 {
1006 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1007 arrow_fields.push(Field::new_list(
1008 "array_of_arrays",
1009 Field::new_list("element", arrow_inner_list, false),
1010 true,
1011 ));
1012 }
1013
1014 {
1021 arrow_fields.push(Field::new_list(
1022 "my_list",
1023 Field::new("str", DataType::Utf8, false),
1024 true,
1025 ));
1026 }
1027
1028 {
1033 arrow_fields.push(Field::new_list(
1034 "my_list",
1035 Field::new("element", DataType::Int32, false),
1036 true,
1037 ));
1038 }
1039
1040 {
1048 let fields = vec![
1049 Field::new("str", DataType::Utf8, false),
1050 Field::new("num", DataType::Int32, false),
1051 ];
1052 arrow_fields.push(Field::new_list(
1053 "my_list",
1054 Field::new_struct("element", fields, false),
1055 true,
1056 ));
1057 }
1058
1059 {
1067 let fields = vec![Field::new("str", DataType::Utf8, false)];
1068 arrow_fields.push(Field::new_list(
1069 "my_list",
1070 Field::new_struct("array", fields, false),
1071 true,
1072 ));
1073 }
1074
1075 {
1083 let fields = vec![Field::new("str", DataType::Utf8, false)];
1084 arrow_fields.push(Field::new_list(
1085 "my_list",
1086 Field::new_struct("my_list_tuple", fields, false),
1087 true,
1088 ));
1089 }
1090
1091 {
1094 arrow_fields.push(Field::new_list(
1095 "name",
1096 Field::new("name", DataType::Int32, false),
1097 false,
1098 ));
1099 }
1100
1101 let parquet_group_type = parse_message_type(message_type).unwrap();
1102
1103 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1104 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1105 let converted_fields = converted_arrow_schema.fields();
1106
1107 assert_eq!(arrow_fields.len(), converted_fields.len());
1108 for i in 0..arrow_fields.len() {
1109 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1110 }
1111 }
1112
1113 #[test]
1114 fn test_parquet_list_nullable() {
1115 let mut arrow_fields = Vec::new();
1116
1117 let message_type = "
1118 message test_schema {
1119 REQUIRED GROUP my_list1 (LIST) {
1120 REPEATED GROUP list {
1121 OPTIONAL BINARY element (UTF8);
1122 }
1123 }
1124 OPTIONAL GROUP my_list2 (LIST) {
1125 REPEATED GROUP list {
1126 REQUIRED BINARY element (UTF8);
1127 }
1128 }
1129 REQUIRED GROUP my_list3 (LIST) {
1130 REPEATED GROUP list {
1131 REQUIRED BINARY element (UTF8);
1132 }
1133 }
1134 }
1135 ";
1136
1137 {
1144 arrow_fields.push(Field::new_list(
1145 "my_list1",
1146 Field::new("element", DataType::Utf8, true),
1147 false,
1148 ));
1149 }
1150
1151 {
1158 arrow_fields.push(Field::new_list(
1159 "my_list2",
1160 Field::new("element", DataType::Utf8, false),
1161 true,
1162 ));
1163 }
1164
1165 {
1172 arrow_fields.push(Field::new_list(
1173 "my_list3",
1174 Field::new("element", DataType::Utf8, false),
1175 false,
1176 ));
1177 }
1178
1179 let parquet_group_type = parse_message_type(message_type).unwrap();
1180
1181 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1182 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1183 let converted_fields = converted_arrow_schema.fields();
1184
1185 assert_eq!(arrow_fields.len(), converted_fields.len());
1186 for i in 0..arrow_fields.len() {
1187 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1188 }
1189 }
1190
1191 #[test]
1192 fn test_parquet_maps() {
1193 let mut arrow_fields = Vec::new();
1194
1195 let message_type = "
1197 message test_schema {
1198 REQUIRED group my_map1 (MAP) {
1199 REPEATED group key_value {
1200 REQUIRED binary key (UTF8);
1201 OPTIONAL int32 value;
1202 }
1203 }
1204 OPTIONAL group my_map2 (MAP) {
1205 REPEATED group map {
1206 REQUIRED binary str (UTF8);
1207 REQUIRED int32 num;
1208 }
1209 }
1210 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1211 REPEATED group map {
1212 REQUIRED binary key (UTF8);
1213 OPTIONAL int32 value;
1214 }
1215 }
1216 REQUIRED group my_map4 (MAP) {
1217 REPEATED group map {
1218 OPTIONAL binary key (UTF8);
1219 REQUIRED int32 value;
1220 }
1221 }
1222 }
1223 ";
1224
1225 {
1233 arrow_fields.push(Field::new_map(
1234 "my_map1",
1235 "key_value",
1236 Field::new("key", DataType::Utf8, false),
1237 Field::new("value", DataType::Int32, true),
1238 false,
1239 false,
1240 ));
1241 }
1242
1243 {
1251 arrow_fields.push(Field::new_map(
1252 "my_map2",
1253 "map",
1254 Field::new("str", DataType::Utf8, false),
1255 Field::new("num", DataType::Int32, false),
1256 false,
1257 true,
1258 ));
1259 }
1260
1261 {
1269 arrow_fields.push(Field::new_map(
1270 "my_map3",
1271 "map",
1272 Field::new("key", DataType::Utf8, false),
1273 Field::new("value", DataType::Int32, true),
1274 false,
1275 true,
1276 ));
1277 }
1278
1279 {
1287 arrow_fields.push(Field::new_map(
1288 "my_map4",
1289 "map",
1290 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1292 false,
1293 false,
1294 ));
1295 }
1296
1297 let parquet_group_type = parse_message_type(message_type).unwrap();
1298
1299 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1300 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1301 let converted_fields = converted_arrow_schema.fields();
1302
1303 assert_eq!(arrow_fields.len(), converted_fields.len());
1304 for i in 0..arrow_fields.len() {
1305 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1306 }
1307 }
1308
1309 #[test]
1310 fn test_nested_schema() {
1311 let mut arrow_fields = Vec::new();
1312 {
1313 let group1_fields = Fields::from(vec![
1314 Field::new("leaf1", DataType::Boolean, false),
1315 Field::new("leaf2", DataType::Int32, false),
1316 ]);
1317 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1318 arrow_fields.push(group1_struct);
1319
1320 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1321 arrow_fields.push(leaf3_field);
1322 }
1323
1324 let message_type = "
1325 message test_schema {
1326 REQUIRED GROUP group1 {
1327 REQUIRED BOOLEAN leaf1;
1328 REQUIRED INT32 leaf2;
1329 }
1330 REQUIRED INT64 leaf3;
1331 }
1332 ";
1333 let parquet_group_type = parse_message_type(message_type).unwrap();
1334
1335 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1336 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1337 let converted_fields = converted_arrow_schema.fields();
1338
1339 assert_eq!(arrow_fields.len(), converted_fields.len());
1340 for i in 0..arrow_fields.len() {
1341 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1342 }
1343 }
1344
1345 #[test]
1346 fn test_nested_schema_partial() {
1347 let mut arrow_fields = Vec::new();
1348 {
1349 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1350 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1351 arrow_fields.push(group1);
1352
1353 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1354 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1355 arrow_fields.push(group2);
1356
1357 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1358 }
1359
1360 let message_type = "
1361 message test_schema {
1362 REQUIRED GROUP group1 {
1363 REQUIRED INT64 leaf1;
1364 REQUIRED INT64 leaf2;
1365 }
1366 REQUIRED GROUP group2 {
1367 REQUIRED INT64 leaf3;
1368 REQUIRED INT64 leaf4;
1369 }
1370 REQUIRED INT64 leaf5;
1371 }
1372 ";
1373 let parquet_group_type = parse_message_type(message_type).unwrap();
1374
1375 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1385 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1386 let converted_arrow_schema =
1387 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1388 let converted_fields = converted_arrow_schema.fields();
1389
1390 assert_eq!(arrow_fields.len(), converted_fields.len());
1391 for i in 0..arrow_fields.len() {
1392 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1393 }
1394 }
1395
1396 #[test]
1397 fn test_nested_schema_partial_ordering() {
1398 let mut arrow_fields = Vec::new();
1399 {
1400 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1401 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1402 arrow_fields.push(group1);
1403
1404 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1405 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1406 arrow_fields.push(group2);
1407
1408 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1409 }
1410
1411 let message_type = "
1412 message test_schema {
1413 REQUIRED GROUP group1 {
1414 REQUIRED INT64 leaf1;
1415 REQUIRED INT64 leaf2;
1416 }
1417 REQUIRED GROUP group2 {
1418 REQUIRED INT64 leaf3;
1419 REQUIRED INT64 leaf4;
1420 }
1421 REQUIRED INT64 leaf5;
1422 }
1423 ";
1424 let parquet_group_type = parse_message_type(message_type).unwrap();
1425
1426 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1436 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1437 let converted_arrow_schema =
1438 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1439 let converted_fields = converted_arrow_schema.fields();
1440
1441 assert_eq!(arrow_fields.len(), converted_fields.len());
1442 for i in 0..arrow_fields.len() {
1443 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1444 }
1445
1446 let mask =
1447 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1448 let converted_arrow_schema =
1449 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1450 let converted_fields = converted_arrow_schema.fields();
1451
1452 assert_eq!(arrow_fields.len(), converted_fields.len());
1453 for i in 0..arrow_fields.len() {
1454 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1455 }
1456 }
1457
1458 #[test]
1459 fn test_repeated_nested_schema() {
1460 let mut arrow_fields = Vec::new();
1461 {
1462 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1463
1464 let inner_group_list = Field::new_list(
1465 "innerGroup",
1466 Field::new_struct(
1467 "innerGroup",
1468 vec![Field::new("leaf3", DataType::Int32, true)],
1469 false,
1470 ),
1471 false,
1472 );
1473
1474 let outer_group_list = Field::new_list(
1475 "outerGroup",
1476 Field::new_struct(
1477 "outerGroup",
1478 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1479 false,
1480 ),
1481 false,
1482 );
1483 arrow_fields.push(outer_group_list);
1484 }
1485
1486 let message_type = "
1487 message test_schema {
1488 OPTIONAL INT32 leaf1;
1489 REPEATED GROUP outerGroup {
1490 OPTIONAL INT32 leaf2;
1491 REPEATED GROUP innerGroup {
1492 OPTIONAL INT32 leaf3;
1493 }
1494 }
1495 }
1496 ";
1497 let parquet_group_type = parse_message_type(message_type).unwrap();
1498
1499 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1500 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1501 let converted_fields = converted_arrow_schema.fields();
1502
1503 assert_eq!(arrow_fields.len(), converted_fields.len());
1504 for i in 0..arrow_fields.len() {
1505 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1506 }
1507 }
1508
1509 #[test]
1510 fn test_column_desc_to_field() {
1511 let message_type = "
1512 message test_schema {
1513 REQUIRED BOOLEAN boolean;
1514 REQUIRED INT32 int8 (INT_8);
1515 REQUIRED INT32 uint8 (INTEGER(8,false));
1516 REQUIRED INT32 int16 (INT_16);
1517 REQUIRED INT32 uint16 (INTEGER(16,false));
1518 REQUIRED INT32 int32;
1519 REQUIRED INT64 int64;
1520 OPTIONAL DOUBLE double;
1521 OPTIONAL FLOAT float;
1522 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1523 OPTIONAL BINARY string (UTF8);
1524 REPEATED BOOLEAN bools;
1525 OPTIONAL INT32 date (DATE);
1526 OPTIONAL INT32 time_milli (TIME_MILLIS);
1527 OPTIONAL INT64 time_micro (TIME_MICROS);
1528 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1529 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1530 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1531 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1532 REPEATED INT32 int_list;
1533 REPEATED BINARY byte_list;
1534 REPEATED BINARY string_list (UTF8);
1535 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1536 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1537 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1538 }
1539 ";
1540 let parquet_group_type = parse_message_type(message_type).unwrap();
1541
1542 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1543 let converted_arrow_fields = parquet_schema
1544 .columns()
1545 .iter()
1546 .map(|c| parquet_to_arrow_field(c).unwrap())
1547 .collect::<Vec<Field>>();
1548
1549 let arrow_fields = vec![
1550 Field::new("boolean", DataType::Boolean, false),
1551 Field::new("int8", DataType::Int8, false),
1552 Field::new("uint8", DataType::UInt8, false),
1553 Field::new("int16", DataType::Int16, false),
1554 Field::new("uint16", DataType::UInt16, false),
1555 Field::new("int32", DataType::Int32, false),
1556 Field::new("int64", DataType::Int64, false),
1557 Field::new("double", DataType::Float64, true),
1558 Field::new("float", DataType::Float32, true),
1559 Field::new("float16", DataType::Float16, true),
1560 Field::new("string", DataType::Utf8, true),
1561 Field::new_list(
1562 "bools",
1563 Field::new("bools", DataType::Boolean, false),
1564 false,
1565 ),
1566 Field::new("date", DataType::Date32, true),
1567 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1568 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1569 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1570 Field::new(
1571 "ts_milli",
1572 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1573 true,
1574 ),
1575 Field::new(
1576 "ts_micro",
1577 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1578 false,
1579 ),
1580 Field::new(
1581 "ts_nano",
1582 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1583 false,
1584 ),
1585 Field::new_list(
1586 "int_list",
1587 Field::new("int_list", DataType::Int32, false),
1588 false,
1589 ),
1590 Field::new_list(
1591 "byte_list",
1592 Field::new("byte_list", DataType::Binary, false),
1593 false,
1594 ),
1595 Field::new_list(
1596 "string_list",
1597 Field::new("string_list", DataType::Utf8, false),
1598 false,
1599 ),
1600 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1601 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1602 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1603 ];
1604
1605 assert_eq!(arrow_fields, converted_arrow_fields);
1606 }
1607
1608 #[test]
1609 fn test_coerced_map_list() {
1610 let arrow_fields = vec![
1612 Field::new_list(
1613 "my_list",
1614 Field::new("item", DataType::Boolean, true),
1615 false,
1616 ),
1617 Field::new_map(
1618 "my_map",
1619 "entries",
1620 Field::new("keys", DataType::Utf8, false),
1621 Field::new("values", DataType::Int32, true),
1622 false,
1623 true,
1624 ),
1625 ];
1626 let arrow_schema = Schema::new(arrow_fields);
1627
1628 let message_type = "
1630 message parquet_schema {
1631 REQUIRED GROUP my_list (LIST) {
1632 REPEATED GROUP list {
1633 OPTIONAL BOOLEAN element;
1634 }
1635 }
1636 OPTIONAL GROUP my_map (MAP) {
1637 REPEATED GROUP key_value {
1638 REQUIRED BINARY key (STRING);
1639 OPTIONAL INT32 value;
1640 }
1641 }
1642 }
1643 ";
1644 let parquet_group_type = parse_message_type(message_type).unwrap();
1645 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1646 let converted_arrow_schema = ArrowSchemaConverter::new()
1647 .with_coerce_types(true)
1648 .convert(&arrow_schema)
1649 .unwrap();
1650 assert_eq!(
1651 parquet_schema.columns().len(),
1652 converted_arrow_schema.columns().len()
1653 );
1654
1655 let message_type = "
1657 message parquet_schema {
1658 REQUIRED GROUP my_list (LIST) {
1659 REPEATED GROUP list {
1660 OPTIONAL BOOLEAN item;
1661 }
1662 }
1663 OPTIONAL GROUP my_map (MAP) {
1664 REPEATED GROUP entries {
1665 REQUIRED BINARY keys (STRING);
1666 OPTIONAL INT32 values;
1667 }
1668 }
1669 }
1670 ";
1671 let parquet_group_type = parse_message_type(message_type).unwrap();
1672 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1673 let converted_arrow_schema = ArrowSchemaConverter::new()
1674 .with_coerce_types(false)
1675 .convert(&arrow_schema)
1676 .unwrap();
1677 assert_eq!(
1678 parquet_schema.columns().len(),
1679 converted_arrow_schema.columns().len()
1680 );
1681 }
1682
1683 #[test]
1684 fn test_field_to_column_desc() {
1685 let message_type = "
1686 message arrow_schema {
1687 REQUIRED BOOLEAN boolean;
1688 REQUIRED INT32 int8 (INT_8);
1689 REQUIRED INT32 int16 (INTEGER(16,true));
1690 REQUIRED INT32 int32;
1691 REQUIRED INT64 int64;
1692 OPTIONAL DOUBLE double;
1693 OPTIONAL FLOAT float;
1694 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1695 OPTIONAL BINARY string (STRING);
1696 OPTIONAL GROUP bools (LIST) {
1697 REPEATED GROUP list {
1698 OPTIONAL BOOLEAN element;
1699 }
1700 }
1701 REQUIRED GROUP bools_non_null (LIST) {
1702 REPEATED GROUP list {
1703 REQUIRED BOOLEAN element;
1704 }
1705 }
1706 OPTIONAL INT32 date (DATE);
1707 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1708 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1709 OPTIONAL INT64 time_micro (TIME_MICROS);
1710 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1711 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1712 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1713 REQUIRED INT64 ts_seconds;
1714 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1715 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1716 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1717 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1718 REQUIRED GROUP struct {
1719 REQUIRED BOOLEAN bools;
1720 REQUIRED INT32 uint32 (INTEGER(32,false));
1721 REQUIRED GROUP int32 (LIST) {
1722 REPEATED GROUP list {
1723 OPTIONAL INT32 element;
1724 }
1725 }
1726 }
1727 REQUIRED BINARY dictionary_strings (STRING);
1728 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1729 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1730 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1731 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1732 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1733 }
1734 ";
1735 let parquet_group_type = parse_message_type(message_type).unwrap();
1736
1737 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1738
1739 let arrow_fields = vec![
1740 Field::new("boolean", DataType::Boolean, false),
1741 Field::new("int8", DataType::Int8, false),
1742 Field::new("int16", DataType::Int16, false),
1743 Field::new("int32", DataType::Int32, false),
1744 Field::new("int64", DataType::Int64, false),
1745 Field::new("double", DataType::Float64, true),
1746 Field::new("float", DataType::Float32, true),
1747 Field::new("float16", DataType::Float16, true),
1748 Field::new("string", DataType::Utf8, true),
1749 Field::new_list(
1750 "bools",
1751 Field::new("element", DataType::Boolean, true),
1752 true,
1753 ),
1754 Field::new_list(
1755 "bools_non_null",
1756 Field::new("element", DataType::Boolean, false),
1757 false,
1758 ),
1759 Field::new("date", DataType::Date32, true),
1760 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1761 Field::new(
1762 "time_milli_utc",
1763 DataType::Time32(TimeUnit::Millisecond),
1764 true,
1765 )
1766 .with_metadata(HashMap::from_iter(vec![(
1767 "adjusted_to_utc".to_string(),
1768 "".to_string(),
1769 )])),
1770 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1771 Field::new(
1772 "time_micro_utc",
1773 DataType::Time64(TimeUnit::Microsecond),
1774 true,
1775 )
1776 .with_metadata(HashMap::from_iter(vec![(
1777 "adjusted_to_utc".to_string(),
1778 "".to_string(),
1779 )])),
1780 Field::new(
1781 "ts_milli",
1782 DataType::Timestamp(TimeUnit::Millisecond, None),
1783 true,
1784 ),
1785 Field::new(
1786 "ts_micro",
1787 DataType::Timestamp(TimeUnit::Microsecond, None),
1788 false,
1789 ),
1790 Field::new(
1791 "ts_seconds",
1792 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1793 false,
1794 ),
1795 Field::new(
1796 "ts_micro_utc",
1797 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1798 false,
1799 ),
1800 Field::new(
1801 "ts_millis_zero_offset",
1802 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1803 false,
1804 ),
1805 Field::new(
1806 "ts_millis_zero_negative_offset",
1807 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1808 false,
1809 ),
1810 Field::new(
1811 "ts_micro_non_utc",
1812 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1813 false,
1814 ),
1815 Field::new_struct(
1816 "struct",
1817 vec![
1818 Field::new("bools", DataType::Boolean, false),
1819 Field::new("uint32", DataType::UInt32, false),
1820 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1821 ],
1822 false,
1823 ),
1824 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1825 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1826 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1827 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1828 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1829 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1830 ];
1831 let arrow_schema = Schema::new(arrow_fields);
1832 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1833
1834 assert_eq!(
1835 parquet_schema.columns().len(),
1836 converted_arrow_schema.columns().len()
1837 );
1838 parquet_schema
1839 .columns()
1840 .iter()
1841 .zip(converted_arrow_schema.columns())
1842 .for_each(|(a, b)| {
1843 match a.logical_type() {
1848 Some(_) => {
1849 assert_eq!(a, b)
1850 }
1851 None => {
1852 assert_eq!(a.name(), b.name());
1853 assert_eq!(a.physical_type(), b.physical_type());
1854 assert_eq!(a.converted_type(), b.converted_type());
1855 }
1856 };
1857 });
1858 }
1859
1860 #[test]
1861 #[should_panic(expected = "Parquet does not support writing empty structs")]
1862 fn test_empty_struct_field() {
1863 let arrow_fields = vec![Field::new(
1864 "struct",
1865 DataType::Struct(Fields::empty()),
1866 false,
1867 )];
1868 let arrow_schema = Schema::new(arrow_fields);
1869 let converted_arrow_schema = ArrowSchemaConverter::new()
1870 .with_coerce_types(true)
1871 .convert(&arrow_schema);
1872
1873 converted_arrow_schema.unwrap();
1874 }
1875
1876 #[test]
1877 fn test_metadata() {
1878 let message_type = "
1879 message test_schema {
1880 OPTIONAL BINARY string (STRING);
1881 }
1882 ";
1883 let parquet_group_type = parse_message_type(message_type).unwrap();
1884
1885 let key_value_metadata = vec![
1886 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1887 KeyValue::new("baz".to_owned(), None),
1888 ];
1889
1890 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1891 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1892
1893 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1894 let converted_arrow_schema =
1895 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1896
1897 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1898 }
1899
1900 #[test]
1901 fn test_arrow_schema_roundtrip() -> Result<()> {
1902 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1903 a.iter()
1904 .map(|(a, b)| (a.to_string(), b.to_string()))
1905 .collect()
1906 };
1907
1908 let schema = Schema::new_with_metadata(
1909 vec![
1910 Field::new("c1", DataType::Utf8, false)
1911 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1912 Field::new("c2", DataType::Binary, false),
1913 Field::new("c3", DataType::FixedSizeBinary(3), false),
1914 Field::new("c4", DataType::Boolean, false),
1915 Field::new("c5", DataType::Date32, false),
1916 Field::new("c6", DataType::Date64, false),
1917 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1918 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1919 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1920 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1921 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1922 Field::new(
1923 "c16",
1924 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1925 false,
1926 ),
1927 Field::new(
1928 "c17",
1929 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1930 false,
1931 ),
1932 Field::new(
1933 "c18",
1934 DataType::Timestamp(TimeUnit::Nanosecond, None),
1935 false,
1936 ),
1937 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1938 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1939 Field::new_list(
1940 "c21",
1941 Field::new_list_field(DataType::Boolean, true)
1942 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1943 false,
1944 )
1945 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1946 Field::new(
1947 "c22",
1948 DataType::FixedSizeList(
1949 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1950 5,
1951 ),
1952 false,
1953 ),
1954 Field::new_list(
1955 "c23",
1956 Field::new_large_list(
1957 "inner",
1958 Field::new_list_field(
1959 DataType::Struct(
1960 vec![
1961 Field::new("a", DataType::Int16, true),
1962 Field::new("b", DataType::Float64, false),
1963 Field::new("c", DataType::Float32, false),
1964 Field::new("d", DataType::Float16, false),
1965 ]
1966 .into(),
1967 ),
1968 false,
1969 ),
1970 true,
1971 ),
1972 false,
1973 ),
1974 Field::new(
1975 "c24",
1976 DataType::Struct(Fields::from(vec![
1977 Field::new("a", DataType::Utf8, false),
1978 Field::new("b", DataType::UInt16, false),
1979 ])),
1980 false,
1981 ),
1982 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1983 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1984 #[allow(deprecated)]
1990 Field::new_dict(
1991 "c31",
1992 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1993 true,
1994 123,
1995 true,
1996 )
1997 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
1998 Field::new("c32", DataType::LargeBinary, true),
1999 Field::new("c33", DataType::LargeUtf8, true),
2000 Field::new_large_list(
2001 "c34",
2002 Field::new_list(
2003 "inner",
2004 Field::new_list_field(
2005 DataType::Struct(
2006 vec![
2007 Field::new("a", DataType::Int16, true),
2008 Field::new("b", DataType::Float64, true),
2009 ]
2010 .into(),
2011 ),
2012 true,
2013 ),
2014 true,
2015 ),
2016 true,
2017 ),
2018 Field::new("c35", DataType::Null, true),
2019 Field::new("c36", DataType::Decimal128(2, 1), false),
2020 Field::new("c37", DataType::Decimal256(50, 20), false),
2021 Field::new("c38", DataType::Decimal128(18, 12), true),
2022 Field::new_map(
2023 "c39",
2024 "key_value",
2025 Field::new("key", DataType::Utf8, false),
2026 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2027 false, true,
2029 ),
2030 Field::new_map(
2031 "c40",
2032 "my_entries",
2033 Field::new("my_key", DataType::Utf8, false)
2034 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2035 Field::new_list(
2036 "my_value",
2037 Field::new_list_field(DataType::Utf8, true)
2038 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2039 true,
2040 )
2041 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2042 false, true,
2044 )
2045 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2046 Field::new_map(
2047 "c41",
2048 "my_entries",
2049 Field::new("my_key", DataType::Utf8, false),
2050 Field::new_list(
2051 "my_value",
2052 Field::new_list_field(DataType::Utf8, true)
2053 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2054 true,
2055 ),
2056 false, false,
2058 ),
2059 Field::new("c42", DataType::Decimal32(5, 2), false),
2060 Field::new("c43", DataType::Decimal64(18, 12), true),
2061 ],
2062 meta(&[("Key", "Value")]),
2063 );
2064
2065 let file = tempfile::tempfile().unwrap();
2067 let writer =
2068 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2069 writer.close()?;
2070
2071 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2073
2074 let read_schema = arrow_reader.schema();
2076 assert_eq!(&schema, read_schema.as_ref());
2077
2078 let mut stack = Vec::with_capacity(10);
2080 let mut out = Vec::with_capacity(10);
2081
2082 let root = arrow_reader.parquet_schema().root_schema_ptr();
2083 stack.push((root.name().to_string(), root));
2084
2085 while let Some((p, t)) = stack.pop() {
2086 if t.is_group() {
2087 for f in t.get_fields() {
2088 stack.push((format!("{p}.{}", f.name()), f.clone()))
2089 }
2090 }
2091
2092 let info = t.get_basic_info();
2093 if info.has_id() {
2094 out.push(format!("{p} -> {}", info.id()))
2095 }
2096 }
2097 out.sort_unstable();
2098 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2099
2100 assert_eq!(
2101 &out,
2102 &[
2103 "arrow_schema.c1 -> 2",
2104 "arrow_schema.c21 -> 4",
2105 "arrow_schema.c21.list.item -> 5",
2106 "arrow_schema.c31 -> 6",
2107 "arrow_schema.c40 -> 7",
2108 "arrow_schema.c40.my_entries.my_key -> 8",
2109 "arrow_schema.c40.my_entries.my_value -> 9",
2110 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2111 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2112 ]
2113 );
2114
2115 Ok(())
2116 }
2117
2118 #[test]
2119 fn test_read_parquet_field_ids_raw() -> Result<()> {
2120 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2121 a.iter()
2122 .map(|(a, b)| (a.to_string(), b.to_string()))
2123 .collect()
2124 };
2125 let schema = Schema::new_with_metadata(
2126 vec![
2127 Field::new("c1", DataType::Utf8, true)
2128 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2129 Field::new("c2", DataType::Utf8, true)
2130 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2131 ],
2132 HashMap::new(),
2133 );
2134
2135 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2136 let parquet_bytes = writer.into_inner()?;
2137
2138 let reader =
2139 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2140 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2141
2142 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2144
2145 let parq_schema_descr = ArrowSchemaConverter::new()
2146 .with_coerce_types(true)
2147 .convert(&arrow_schema)?;
2148 let parq_fields = parq_schema_descr.root_schema().get_fields();
2149 assert_eq!(parq_fields.len(), 2);
2150 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2151 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2152
2153 Ok(())
2154 }
2155
2156 #[test]
2157 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2158 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2159 .iter()
2160 .cloned()
2161 .collect();
2162
2163 let schema = Schema::new_with_metadata(
2164 vec![
2165 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2166 Field::new(
2167 "c22",
2168 DataType::FixedSizeList(
2169 Arc::new(Field::new("items", DataType::Boolean, false)),
2170 5,
2171 ),
2172 false,
2173 ),
2174 Field::new_list(
2175 "c23",
2176 Field::new_large_list(
2177 "items",
2178 Field::new_struct(
2179 "items",
2180 vec![
2181 Field::new("a", DataType::Int16, true),
2182 Field::new("b", DataType::Float64, false),
2183 ],
2184 true,
2185 ),
2186 true,
2187 ),
2188 true,
2189 ),
2190 ],
2191 metadata,
2192 );
2193
2194 let file = tempfile::tempfile().unwrap();
2196 let writer =
2197 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2198 writer.close()?;
2199
2200 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2202 let read_schema = arrow_reader.schema();
2203 assert_eq!(&schema, read_schema.as_ref());
2204 Ok(())
2205 }
2206
2207 #[test]
2208 fn test_get_arrow_schema_from_metadata() {
2209 assert!(get_arrow_schema_from_metadata("").is_err());
2210 }
2211
2212 #[test]
2213 #[cfg(feature = "arrow_canonical_extension_types")]
2214 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2215 use arrow_schema::extension::Uuid;
2216 let arrow_schema = Schema::new(vec![Field::new(
2217 "uuid",
2218 DataType::FixedSizeBinary(16),
2219 false,
2220 )
2221 .with_extension_type(Uuid)]);
2222
2223 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2224
2225 assert_eq!(
2226 parquet_schema.column(0).logical_type(),
2227 Some(LogicalType::Uuid)
2228 );
2229
2230 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2231 assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2232
2233 Ok(())
2234 }
2235
2236 #[test]
2237 #[cfg(feature = "arrow_canonical_extension_types")]
2238 fn arrow_json_to_parquet_json() -> Result<()> {
2239 use arrow_schema::extension::Json;
2240 let arrow_schema = Schema::new(vec![
2241 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2242 ]);
2243
2244 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2245
2246 assert_eq!(
2247 parquet_schema.column(0).logical_type(),
2248 Some(LogicalType::Json)
2249 );
2250
2251 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2252 assert_eq!(
2253 arrow_schema.field(0).try_extension_type::<Json>()?,
2254 Json::default()
2255 );
2256
2257 Ok(())
2258 }
2259}