1use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45pub fn parquet_to_arrow_schema(
50 parquet_schema: &SchemaDescriptor,
51 key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56pub fn parquet_to_arrow_schema_by_columns(
59 parquet_schema: &SchemaDescriptor,
60 mask: ProjectionMask,
61 key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66pub(crate) fn parquet_to_arrow_schema_and_fields(
68 parquet_schema: &SchemaDescriptor,
69 mask: ProjectionMask,
70 key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73 let maybe_schema = metadata
74 .remove(super::ARROW_SCHEMA_META_KEY)
75 .map(|value| get_arrow_schema_from_metadata(&value))
76 .transpose()?;
77
78 if let Some(arrow_schema) = &maybe_schema {
80 arrow_schema.metadata().iter().for_each(|(k, v)| {
81 metadata.entry(k.clone()).or_insert_with(|| v.clone());
82 });
83 }
84
85 let hint = maybe_schema.as_ref().map(|s| s.fields());
86 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88 Ok((schema, field_levels.levels))
89}
90
91#[derive(Debug, Clone)]
99pub struct FieldLevels {
100 pub(crate) fields: Fields,
101 pub(crate) levels: Option<ParquetField>,
102}
103
104pub fn parquet_to_arrow_field_levels(
125 schema: &SchemaDescriptor,
126 mask: ProjectionMask,
127 hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129 match complex::convert_schema(schema, mask, hint)? {
130 Some(field) => match &field.arrow_type {
131 DataType::Struct(fields) => Ok(FieldLevels {
132 fields: fields.clone(),
133 levels: Some(field),
134 }),
135 _ => unreachable!(),
136 },
137 None => Ok(FieldLevels {
138 fields: Fields::empty(),
139 levels: None,
140 }),
141 }
142}
143
144fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146 let decoded = BASE64_STANDARD.decode(encoded_meta);
147 match decoded {
148 Ok(bytes) => {
149 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150 &bytes[8..]
151 } else {
152 bytes.as_slice()
153 };
154 match arrow_ipc::root_as_message(slice) {
155 Ok(message) => message
156 .header_as_schema()
157 .map(arrow_ipc::convert::fb_to_schema)
158 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159 Err(err) => {
160 Err(arrow_err!(
162 "Unable to get root as message stored in {}: {:?}",
163 super::ARROW_SCHEMA_META_KEY,
164 err
165 ))
166 }
167 }
168 }
169 Err(err) => {
170 Err(arrow_err!(
172 "Unable to decode the encoded schema stored in {}, {:?}",
173 super::ARROW_SCHEMA_META_KEY,
174 err
175 ))
176 }
177 }
178}
179
180pub fn encode_arrow_schema(schema: &Schema) -> String {
182 let options = writer::IpcWriteOptions::default();
183 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
184 let data_gen = writer::IpcDataGenerator::default();
185 let mut serialized_schema =
186 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
187
188 let schema_len = serialized_schema.ipc_message.len();
191 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
192 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
193 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
194 len_prefix_schema.append(&mut serialized_schema.ipc_message);
195
196 BASE64_STANDARD.encode(&len_prefix_schema)
197}
198
199pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
206 let encoded = encode_arrow_schema(schema);
207
208 let schema_kv = KeyValue {
209 key: super::ARROW_SCHEMA_META_KEY.to_string(),
210 value: Some(encoded),
211 };
212
213 let meta = props
214 .key_value_metadata
215 .get_or_insert_with(Default::default);
216
217 let schema_meta = meta
219 .iter()
220 .enumerate()
221 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
222 match schema_meta {
223 Some((i, _)) => {
224 meta.remove(i);
225 meta.push(schema_kv);
226 }
227 None => {
228 meta.push(schema_kv);
229 }
230 }
231}
232
233#[derive(Debug)]
278pub struct ArrowSchemaConverter<'a> {
279 schema_root: &'a str,
281 coerce_types: bool,
285}
286
287impl Default for ArrowSchemaConverter<'_> {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293impl<'a> ArrowSchemaConverter<'a> {
294 pub fn new() -> Self {
296 Self {
297 schema_root: "arrow_schema",
298 coerce_types: false,
299 }
300 }
301
302 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
333 self.coerce_types = coerce_types;
334 self
335 }
336
337 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
339 self.schema_root = schema_root;
340 self
341 }
342
343 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
347 let fields = schema
348 .fields()
349 .iter()
350 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
351 .collect::<Result<_>>()?;
352 let group = Type::group_type_builder(self.schema_root)
353 .with_fields(fields)
354 .build()?;
355 Ok(SchemaDescriptor::new(Arc::new(group)))
356 }
357}
358
359fn parse_key_value_metadata(
360 key_value_metadata: Option<&Vec<KeyValue>>,
361) -> Option<HashMap<String, String>> {
362 match key_value_metadata {
363 Some(key_values) => {
364 let map: HashMap<String, String> = key_values
365 .iter()
366 .filter_map(|kv| {
367 kv.value
368 .as_ref()
369 .map(|value| (kv.key.clone(), value.clone()))
370 })
371 .collect();
372
373 if map.is_empty() {
374 None
375 } else {
376 Some(map)
377 }
378 }
379 None => None,
380 }
381}
382
383pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
385 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
386 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
387
388 let basic_info = parquet_column.self_type().get_basic_info();
389 let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
390 2
391 } else {
392 1
393 });
394 if basic_info.has_id() {
395 meta.insert(
396 PARQUET_FIELD_ID_META_KEY.to_string(),
397 basic_info.id().to_string(),
398 );
399 }
400 #[cfg(feature = "arrow_canonical_extension_types")]
401 if let Some(logical_type) = basic_info.logical_type() {
402 match logical_type {
403 LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
404 LogicalType::Json => ret.try_with_extension_type(Json::default())?,
405 _ => {}
406 }
407 }
408 if !meta.is_empty() {
409 ret.set_metadata(meta);
410 }
411
412 Ok(ret)
413}
414
415pub fn decimal_length_from_precision(precision: u8) -> usize {
416 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
424}
425
426fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
428 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
429 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
430 const PARQUET_KEY_FIELD_NAME: &str = "key";
431 const PARQUET_VALUE_FIELD_NAME: &str = "value";
432
433 let name = field.name().as_str();
434 let repetition = if field.is_nullable() {
435 Repetition::OPTIONAL
436 } else {
437 Repetition::REQUIRED
438 };
439 let id = field_id(field);
440 match field.data_type() {
442 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
443 .with_logical_type(Some(LogicalType::Unknown))
444 .with_repetition(repetition)
445 .with_id(id)
446 .build(),
447 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
448 .with_repetition(repetition)
449 .with_id(id)
450 .build(),
451 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
452 .with_logical_type(Some(LogicalType::Integer {
453 bit_width: 8,
454 is_signed: true,
455 }))
456 .with_repetition(repetition)
457 .with_id(id)
458 .build(),
459 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
460 .with_logical_type(Some(LogicalType::Integer {
461 bit_width: 16,
462 is_signed: true,
463 }))
464 .with_repetition(repetition)
465 .with_id(id)
466 .build(),
467 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
468 .with_repetition(repetition)
469 .with_id(id)
470 .build(),
471 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
472 .with_repetition(repetition)
473 .with_id(id)
474 .build(),
475 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
476 .with_logical_type(Some(LogicalType::Integer {
477 bit_width: 8,
478 is_signed: false,
479 }))
480 .with_repetition(repetition)
481 .with_id(id)
482 .build(),
483 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
484 .with_logical_type(Some(LogicalType::Integer {
485 bit_width: 16,
486 is_signed: false,
487 }))
488 .with_repetition(repetition)
489 .with_id(id)
490 .build(),
491 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
492 .with_logical_type(Some(LogicalType::Integer {
493 bit_width: 32,
494 is_signed: false,
495 }))
496 .with_repetition(repetition)
497 .with_id(id)
498 .build(),
499 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
500 .with_logical_type(Some(LogicalType::Integer {
501 bit_width: 64,
502 is_signed: false,
503 }))
504 .with_repetition(repetition)
505 .with_id(id)
506 .build(),
507 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
508 .with_repetition(repetition)
509 .with_id(id)
510 .with_logical_type(Some(LogicalType::Float16))
511 .with_length(2)
512 .build(),
513 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
514 .with_repetition(repetition)
515 .with_id(id)
516 .build(),
517 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
518 .with_repetition(repetition)
519 .with_id(id)
520 .build(),
521 DataType::Timestamp(TimeUnit::Second, _) => {
522 Type::primitive_type_builder(name, PhysicalType::INT64)
524 .with_repetition(repetition)
525 .with_id(id)
526 .build()
527 }
528 DataType::Timestamp(time_unit, tz) => {
529 Type::primitive_type_builder(name, PhysicalType::INT64)
530 .with_logical_type(Some(LogicalType::Timestamp {
531 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
533 unit: match time_unit {
534 TimeUnit::Second => unreachable!(),
535 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
536 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
537 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
538 },
539 }))
540 .with_repetition(repetition)
541 .with_id(id)
542 .build()
543 }
544 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
545 .with_logical_type(Some(LogicalType::Date))
546 .with_repetition(repetition)
547 .with_id(id)
548 .build(),
549 DataType::Date64 => {
550 if coerce_types {
551 Type::primitive_type_builder(name, PhysicalType::INT32)
552 .with_logical_type(Some(LogicalType::Date))
553 .with_repetition(repetition)
554 .with_id(id)
555 .build()
556 } else {
557 Type::primitive_type_builder(name, PhysicalType::INT64)
558 .with_repetition(repetition)
559 .with_id(id)
560 .build()
561 }
562 }
563 DataType::Time32(TimeUnit::Second) => {
564 Type::primitive_type_builder(name, PhysicalType::INT32)
566 .with_repetition(repetition)
567 .with_id(id)
568 .build()
569 }
570 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
571 .with_logical_type(Some(LogicalType::Time {
572 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
573 unit: match unit {
574 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
575 u => unreachable!("Invalid unit for Time32: {:?}", u),
576 },
577 }))
578 .with_repetition(repetition)
579 .with_id(id)
580 .build(),
581 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
582 .with_logical_type(Some(LogicalType::Time {
583 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
584 unit: match unit {
585 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
586 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
587 u => unreachable!("Invalid unit for Time64: {:?}", u),
588 },
589 }))
590 .with_repetition(repetition)
591 .with_id(id)
592 .build(),
593 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
594 .with_repetition(repetition)
595 .with_id(id)
596 .build(),
597 DataType::Interval(_) => {
598 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
599 .with_converted_type(ConvertedType::INTERVAL)
600 .with_repetition(repetition)
601 .with_id(id)
602 .with_length(12)
603 .build()
604 }
605 DataType::Binary | DataType::LargeBinary => {
606 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
607 .with_repetition(repetition)
608 .with_id(id)
609 .build()
610 }
611 DataType::FixedSizeBinary(length) => {
612 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
613 .with_repetition(repetition)
614 .with_id(id)
615 .with_length(*length)
616 .with_logical_type(
617 #[cfg(feature = "arrow_canonical_extension_types")]
618 field
620 .try_extension_type::<Uuid>()
621 .ok()
622 .map(|_| LogicalType::Uuid),
623 #[cfg(not(feature = "arrow_canonical_extension_types"))]
624 None,
625 )
626 .build()
627 }
628 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
629 .with_repetition(repetition)
630 .with_id(id)
631 .build(),
632 DataType::Decimal32(precision, scale)
633 | DataType::Decimal64(precision, scale)
634 | DataType::Decimal128(precision, scale)
635 | DataType::Decimal256(precision, scale) => {
636 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
639 (PhysicalType::INT32, -1)
640 } else if *precision <= 18 {
641 (PhysicalType::INT64, -1)
642 } else {
643 (
644 PhysicalType::FIXED_LEN_BYTE_ARRAY,
645 decimal_length_from_precision(*precision) as i32,
646 )
647 };
648 Type::primitive_type_builder(name, physical_type)
649 .with_repetition(repetition)
650 .with_id(id)
651 .with_length(length)
652 .with_logical_type(Some(LogicalType::Decimal {
653 scale: *scale as i32,
654 precision: *precision as i32,
655 }))
656 .with_precision(*precision as i32)
657 .with_scale(*scale as i32)
658 .build()
659 }
660 DataType::Utf8 | DataType::LargeUtf8 => {
661 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
662 .with_logical_type({
663 #[cfg(feature = "arrow_canonical_extension_types")]
664 {
665 field
668 .try_extension_type::<Json>()
669 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
670 }
671 #[cfg(not(feature = "arrow_canonical_extension_types"))]
672 Some(LogicalType::String)
673 })
674 .with_repetition(repetition)
675 .with_id(id)
676 .build()
677 }
678 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
679 .with_logical_type({
680 #[cfg(feature = "arrow_canonical_extension_types")]
681 {
682 field
685 .try_extension_type::<Json>()
686 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
687 }
688 #[cfg(not(feature = "arrow_canonical_extension_types"))]
689 Some(LogicalType::String)
690 })
691 .with_repetition(repetition)
692 .with_id(id)
693 .build(),
694 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
695 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
696 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
698 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
699 } else {
700 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
701 };
702
703 Type::group_type_builder(name)
704 .with_fields(vec![Arc::new(
705 Type::group_type_builder("list")
706 .with_fields(vec![field_ref])
707 .with_repetition(Repetition::REPEATED)
708 .build()?,
709 )])
710 .with_logical_type(Some(LogicalType::List))
711 .with_repetition(repetition)
712 .with_id(id)
713 .build()
714 }
715 DataType::ListView(_) | DataType::LargeListView(_) => {
716 unimplemented!("ListView/LargeListView not implemented")
717 }
718 DataType::Struct(fields) => {
719 if fields.is_empty() {
720 return Err(arrow_err!("Parquet does not support writing empty structs",));
721 }
722 let fields = fields
724 .iter()
725 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
726 .collect::<Result<_>>()?;
727 Type::group_type_builder(name)
728 .with_fields(fields)
729 .with_repetition(repetition)
730 .with_id(id)
731 .build()
732 }
733 DataType::Map(field, _) => {
734 if let DataType::Struct(struct_fields) = field.data_type() {
735 let map_struct_name = if coerce_types {
737 PARQUET_MAP_STRUCT_NAME
738 } else {
739 field.name()
740 };
741
742 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
744 if coerce_types && fld.name() != name {
745 let f = fld.as_ref().clone().with_name(name);
746 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
747 } else {
748 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
749 }
750 };
751 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
752 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
753
754 Type::group_type_builder(name)
755 .with_fields(vec![Arc::new(
756 Type::group_type_builder(map_struct_name)
757 .with_fields(vec![key_field, val_field])
758 .with_repetition(Repetition::REPEATED)
759 .build()?,
760 )])
761 .with_logical_type(Some(LogicalType::Map))
762 .with_repetition(repetition)
763 .with_id(id)
764 .build()
765 } else {
766 Err(arrow_err!(
767 "DataType::Map should contain a struct field child",
768 ))
769 }
770 }
771 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
772 DataType::Dictionary(_, ref value) => {
773 let dict_field = field.clone().with_data_type(value.as_ref().clone());
775 arrow_to_parquet_type(&dict_field, coerce_types)
776 }
777 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
778 "Converting RunEndEncodedType to parquet not supported",
779 )),
780 }
781}
782
783fn field_id(field: &Field) -> Option<i32> {
784 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
785 value.parse().ok() }
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791
792 use std::{collections::HashMap, sync::Arc};
793
794 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
795
796 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
797 use crate::file::metadata::KeyValue;
798 use crate::file::reader::FileReader;
799 use crate::{
800 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
801 schema::{parser::parse_message_type, types::SchemaDescriptor},
802 };
803
804 #[test]
805 fn test_flat_primitives() {
806 let message_type = "
807 message test_schema {
808 REQUIRED BOOLEAN boolean;
809 REQUIRED INT32 int8 (INT_8);
810 REQUIRED INT32 int16 (INT_16);
811 REQUIRED INT32 uint8 (INTEGER(8,false));
812 REQUIRED INT32 uint16 (INTEGER(16,false));
813 REQUIRED INT32 int32;
814 REQUIRED INT64 int64;
815 OPTIONAL DOUBLE double;
816 OPTIONAL FLOAT float;
817 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
818 OPTIONAL BINARY string (UTF8);
819 OPTIONAL BINARY string_2 (STRING);
820 OPTIONAL BINARY json (JSON);
821 }
822 ";
823 let parquet_group_type = parse_message_type(message_type).unwrap();
824
825 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
826 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
827
828 let arrow_fields = Fields::from(vec![
829 Field::new("boolean", DataType::Boolean, false),
830 Field::new("int8", DataType::Int8, false),
831 Field::new("int16", DataType::Int16, false),
832 Field::new("uint8", DataType::UInt8, false),
833 Field::new("uint16", DataType::UInt16, false),
834 Field::new("int32", DataType::Int32, false),
835 Field::new("int64", DataType::Int64, false),
836 Field::new("double", DataType::Float64, true),
837 Field::new("float", DataType::Float32, true),
838 Field::new("float16", DataType::Float16, true),
839 Field::new("string", DataType::Utf8, true),
840 Field::new("string_2", DataType::Utf8, true),
841 Field::new("json", DataType::Utf8, true),
842 ]);
843
844 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
845 }
846
847 #[test]
848 fn test_decimal_fields() {
849 let message_type = "
850 message test_schema {
851 REQUIRED INT32 decimal1 (DECIMAL(4,2));
852 REQUIRED INT64 decimal2 (DECIMAL(12,2));
853 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
854 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
855 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
856 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
857 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
858 }
859 ";
860
861 let parquet_group_type = parse_message_type(message_type).unwrap();
862
863 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
864 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
865
866 let arrow_fields = Fields::from(vec![
867 Field::new("decimal1", DataType::Decimal128(4, 2), false),
868 Field::new("decimal2", DataType::Decimal128(12, 2), false),
869 Field::new("decimal3", DataType::Decimal128(30, 2), false),
870 Field::new("decimal4", DataType::Decimal128(33, 2), false),
871 Field::new("decimal5", DataType::Decimal128(38, 2), false),
872 Field::new("decimal6", DataType::Decimal256(39, 2), false),
873 Field::new("decimal7", DataType::Decimal256(39, 2), false),
874 ]);
875 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
876 }
877
878 #[test]
879 fn test_byte_array_fields() {
880 let message_type = "
881 message test_schema {
882 REQUIRED BYTE_ARRAY binary;
883 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
884 }
885 ";
886
887 let parquet_group_type = parse_message_type(message_type).unwrap();
888
889 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
890 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
891
892 let arrow_fields = Fields::from(vec![
893 Field::new("binary", DataType::Binary, false),
894 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
895 ]);
896 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
897 }
898
899 #[test]
900 fn test_duplicate_fields() {
901 let message_type = "
902 message test_schema {
903 REQUIRED BOOLEAN boolean;
904 REQUIRED INT32 int8 (INT_8);
905 }
906 ";
907
908 let parquet_group_type = parse_message_type(message_type).unwrap();
909
910 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
911 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
912
913 let arrow_fields = Fields::from(vec![
914 Field::new("boolean", DataType::Boolean, false),
915 Field::new("int8", DataType::Int8, false),
916 ]);
917 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
918
919 let converted_arrow_schema =
920 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
921 .unwrap();
922 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
923 }
924
925 #[test]
926 fn test_parquet_lists() {
927 let mut arrow_fields = Vec::new();
928
929 let message_type = "
931 message test_schema {
932 REQUIRED GROUP my_list (LIST) {
933 REPEATED GROUP list {
934 OPTIONAL BINARY element (UTF8);
935 }
936 }
937 OPTIONAL GROUP my_list (LIST) {
938 REPEATED GROUP list {
939 REQUIRED BINARY element (UTF8);
940 }
941 }
942 OPTIONAL GROUP array_of_arrays (LIST) {
943 REPEATED GROUP list {
944 REQUIRED GROUP element (LIST) {
945 REPEATED GROUP list {
946 REQUIRED INT32 element;
947 }
948 }
949 }
950 }
951 OPTIONAL GROUP my_list (LIST) {
952 REPEATED GROUP element {
953 REQUIRED BINARY str (UTF8);
954 }
955 }
956 OPTIONAL GROUP my_list (LIST) {
957 REPEATED INT32 element;
958 }
959 OPTIONAL GROUP my_list (LIST) {
960 REPEATED GROUP element {
961 REQUIRED BINARY str (UTF8);
962 REQUIRED INT32 num;
963 }
964 }
965 OPTIONAL GROUP my_list (LIST) {
966 REPEATED GROUP array {
967 REQUIRED BINARY str (UTF8);
968 }
969
970 }
971 OPTIONAL GROUP my_list (LIST) {
972 REPEATED GROUP my_list_tuple {
973 REQUIRED BINARY str (UTF8);
974 }
975 }
976 REPEATED INT32 name;
977 }
978 ";
979
980 {
987 arrow_fields.push(Field::new_list(
988 "my_list",
989 Field::new("element", DataType::Utf8, true),
990 false,
991 ));
992 }
993
994 {
1001 arrow_fields.push(Field::new_list(
1002 "my_list",
1003 Field::new("element", DataType::Utf8, false),
1004 true,
1005 ));
1006 }
1007
1008 {
1021 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1022 arrow_fields.push(Field::new_list(
1023 "array_of_arrays",
1024 Field::new_list("element", arrow_inner_list, false),
1025 true,
1026 ));
1027 }
1028
1029 {
1036 arrow_fields.push(Field::new_list(
1037 "my_list",
1038 Field::new("str", DataType::Utf8, false),
1039 true,
1040 ));
1041 }
1042
1043 {
1048 arrow_fields.push(Field::new_list(
1049 "my_list",
1050 Field::new("element", DataType::Int32, false),
1051 true,
1052 ));
1053 }
1054
1055 {
1063 let fields = vec![
1064 Field::new("str", DataType::Utf8, false),
1065 Field::new("num", DataType::Int32, false),
1066 ];
1067 arrow_fields.push(Field::new_list(
1068 "my_list",
1069 Field::new_struct("element", fields, false),
1070 true,
1071 ));
1072 }
1073
1074 {
1082 let fields = vec![Field::new("str", DataType::Utf8, false)];
1083 arrow_fields.push(Field::new_list(
1084 "my_list",
1085 Field::new_struct("array", fields, false),
1086 true,
1087 ));
1088 }
1089
1090 {
1098 let fields = vec![Field::new("str", DataType::Utf8, false)];
1099 arrow_fields.push(Field::new_list(
1100 "my_list",
1101 Field::new_struct("my_list_tuple", fields, false),
1102 true,
1103 ));
1104 }
1105
1106 {
1109 arrow_fields.push(Field::new_list(
1110 "name",
1111 Field::new("name", DataType::Int32, false),
1112 false,
1113 ));
1114 }
1115
1116 let parquet_group_type = parse_message_type(message_type).unwrap();
1117
1118 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1119 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1120 let converted_fields = converted_arrow_schema.fields();
1121
1122 assert_eq!(arrow_fields.len(), converted_fields.len());
1123 for i in 0..arrow_fields.len() {
1124 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1125 }
1126 }
1127
1128 #[test]
1129 fn test_parquet_list_nullable() {
1130 let mut arrow_fields = Vec::new();
1131
1132 let message_type = "
1133 message test_schema {
1134 REQUIRED GROUP my_list1 (LIST) {
1135 REPEATED GROUP list {
1136 OPTIONAL BINARY element (UTF8);
1137 }
1138 }
1139 OPTIONAL GROUP my_list2 (LIST) {
1140 REPEATED GROUP list {
1141 REQUIRED BINARY element (UTF8);
1142 }
1143 }
1144 REQUIRED GROUP my_list3 (LIST) {
1145 REPEATED GROUP list {
1146 REQUIRED BINARY element (UTF8);
1147 }
1148 }
1149 }
1150 ";
1151
1152 {
1159 arrow_fields.push(Field::new_list(
1160 "my_list1",
1161 Field::new("element", DataType::Utf8, true),
1162 false,
1163 ));
1164 }
1165
1166 {
1173 arrow_fields.push(Field::new_list(
1174 "my_list2",
1175 Field::new("element", DataType::Utf8, false),
1176 true,
1177 ));
1178 }
1179
1180 {
1187 arrow_fields.push(Field::new_list(
1188 "my_list3",
1189 Field::new("element", DataType::Utf8, false),
1190 false,
1191 ));
1192 }
1193
1194 let parquet_group_type = parse_message_type(message_type).unwrap();
1195
1196 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1197 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1198 let converted_fields = converted_arrow_schema.fields();
1199
1200 assert_eq!(arrow_fields.len(), converted_fields.len());
1201 for i in 0..arrow_fields.len() {
1202 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1203 }
1204 }
1205
1206 #[test]
1207 fn test_parquet_maps() {
1208 let mut arrow_fields = Vec::new();
1209
1210 let message_type = "
1212 message test_schema {
1213 REQUIRED group my_map1 (MAP) {
1214 REPEATED group key_value {
1215 REQUIRED binary key (UTF8);
1216 OPTIONAL int32 value;
1217 }
1218 }
1219 OPTIONAL group my_map2 (MAP) {
1220 REPEATED group map {
1221 REQUIRED binary str (UTF8);
1222 REQUIRED int32 num;
1223 }
1224 }
1225 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1226 REPEATED group map {
1227 REQUIRED binary key (UTF8);
1228 OPTIONAL int32 value;
1229 }
1230 }
1231 REQUIRED group my_map4 (MAP) {
1232 REPEATED group map {
1233 OPTIONAL binary key (UTF8);
1234 REQUIRED int32 value;
1235 }
1236 }
1237 }
1238 ";
1239
1240 {
1248 arrow_fields.push(Field::new_map(
1249 "my_map1",
1250 "key_value",
1251 Field::new("key", DataType::Utf8, false),
1252 Field::new("value", DataType::Int32, true),
1253 false,
1254 false,
1255 ));
1256 }
1257
1258 {
1266 arrow_fields.push(Field::new_map(
1267 "my_map2",
1268 "map",
1269 Field::new("str", DataType::Utf8, false),
1270 Field::new("num", DataType::Int32, false),
1271 false,
1272 true,
1273 ));
1274 }
1275
1276 {
1284 arrow_fields.push(Field::new_map(
1285 "my_map3",
1286 "map",
1287 Field::new("key", DataType::Utf8, false),
1288 Field::new("value", DataType::Int32, true),
1289 false,
1290 true,
1291 ));
1292 }
1293
1294 {
1302 arrow_fields.push(Field::new_map(
1303 "my_map4",
1304 "map",
1305 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1307 false,
1308 false,
1309 ));
1310 }
1311
1312 let parquet_group_type = parse_message_type(message_type).unwrap();
1313
1314 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1315 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1316 let converted_fields = converted_arrow_schema.fields();
1317
1318 assert_eq!(arrow_fields.len(), converted_fields.len());
1319 for i in 0..arrow_fields.len() {
1320 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1321 }
1322 }
1323
1324 #[test]
1325 fn test_nested_schema() {
1326 let mut arrow_fields = Vec::new();
1327 {
1328 let group1_fields = Fields::from(vec![
1329 Field::new("leaf1", DataType::Boolean, false),
1330 Field::new("leaf2", DataType::Int32, false),
1331 ]);
1332 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1333 arrow_fields.push(group1_struct);
1334
1335 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1336 arrow_fields.push(leaf3_field);
1337 }
1338
1339 let message_type = "
1340 message test_schema {
1341 REQUIRED GROUP group1 {
1342 REQUIRED BOOLEAN leaf1;
1343 REQUIRED INT32 leaf2;
1344 }
1345 REQUIRED INT64 leaf3;
1346 }
1347 ";
1348 let parquet_group_type = parse_message_type(message_type).unwrap();
1349
1350 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1351 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1352 let converted_fields = converted_arrow_schema.fields();
1353
1354 assert_eq!(arrow_fields.len(), converted_fields.len());
1355 for i in 0..arrow_fields.len() {
1356 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1357 }
1358 }
1359
1360 #[test]
1361 fn test_nested_schema_partial() {
1362 let mut arrow_fields = Vec::new();
1363 {
1364 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1365 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1366 arrow_fields.push(group1);
1367
1368 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1369 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1370 arrow_fields.push(group2);
1371
1372 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1373 }
1374
1375 let message_type = "
1376 message test_schema {
1377 REQUIRED GROUP group1 {
1378 REQUIRED INT64 leaf1;
1379 REQUIRED INT64 leaf2;
1380 }
1381 REQUIRED GROUP group2 {
1382 REQUIRED INT64 leaf3;
1383 REQUIRED INT64 leaf4;
1384 }
1385 REQUIRED INT64 leaf5;
1386 }
1387 ";
1388 let parquet_group_type = parse_message_type(message_type).unwrap();
1389
1390 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1400 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1401 let converted_arrow_schema =
1402 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1403 let converted_fields = converted_arrow_schema.fields();
1404
1405 assert_eq!(arrow_fields.len(), converted_fields.len());
1406 for i in 0..arrow_fields.len() {
1407 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1408 }
1409 }
1410
1411 #[test]
1412 fn test_nested_schema_partial_ordering() {
1413 let mut arrow_fields = Vec::new();
1414 {
1415 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1416 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1417 arrow_fields.push(group1);
1418
1419 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1420 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1421 arrow_fields.push(group2);
1422
1423 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1424 }
1425
1426 let message_type = "
1427 message test_schema {
1428 REQUIRED GROUP group1 {
1429 REQUIRED INT64 leaf1;
1430 REQUIRED INT64 leaf2;
1431 }
1432 REQUIRED GROUP group2 {
1433 REQUIRED INT64 leaf3;
1434 REQUIRED INT64 leaf4;
1435 }
1436 REQUIRED INT64 leaf5;
1437 }
1438 ";
1439 let parquet_group_type = parse_message_type(message_type).unwrap();
1440
1441 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1451 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1452 let converted_arrow_schema =
1453 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1454 let converted_fields = converted_arrow_schema.fields();
1455
1456 assert_eq!(arrow_fields.len(), converted_fields.len());
1457 for i in 0..arrow_fields.len() {
1458 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1459 }
1460
1461 let mask =
1462 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1463 let converted_arrow_schema =
1464 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1465 let converted_fields = converted_arrow_schema.fields();
1466
1467 assert_eq!(arrow_fields.len(), converted_fields.len());
1468 for i in 0..arrow_fields.len() {
1469 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1470 }
1471 }
1472
1473 #[test]
1474 fn test_repeated_nested_schema() {
1475 let mut arrow_fields = Vec::new();
1476 {
1477 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1478
1479 let inner_group_list = Field::new_list(
1480 "innerGroup",
1481 Field::new_struct(
1482 "innerGroup",
1483 vec![Field::new("leaf3", DataType::Int32, true)],
1484 false,
1485 ),
1486 false,
1487 );
1488
1489 let outer_group_list = Field::new_list(
1490 "outerGroup",
1491 Field::new_struct(
1492 "outerGroup",
1493 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1494 false,
1495 ),
1496 false,
1497 );
1498 arrow_fields.push(outer_group_list);
1499 }
1500
1501 let message_type = "
1502 message test_schema {
1503 OPTIONAL INT32 leaf1;
1504 REPEATED GROUP outerGroup {
1505 OPTIONAL INT32 leaf2;
1506 REPEATED GROUP innerGroup {
1507 OPTIONAL INT32 leaf3;
1508 }
1509 }
1510 }
1511 ";
1512 let parquet_group_type = parse_message_type(message_type).unwrap();
1513
1514 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1515 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1516 let converted_fields = converted_arrow_schema.fields();
1517
1518 assert_eq!(arrow_fields.len(), converted_fields.len());
1519 for i in 0..arrow_fields.len() {
1520 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1521 }
1522 }
1523
1524 #[test]
1525 fn test_column_desc_to_field() {
1526 let message_type = "
1527 message test_schema {
1528 REQUIRED BOOLEAN boolean;
1529 REQUIRED INT32 int8 (INT_8);
1530 REQUIRED INT32 uint8 (INTEGER(8,false));
1531 REQUIRED INT32 int16 (INT_16);
1532 REQUIRED INT32 uint16 (INTEGER(16,false));
1533 REQUIRED INT32 int32;
1534 REQUIRED INT64 int64;
1535 OPTIONAL DOUBLE double;
1536 OPTIONAL FLOAT float;
1537 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1538 OPTIONAL BINARY string (UTF8);
1539 REPEATED BOOLEAN bools;
1540 OPTIONAL INT32 date (DATE);
1541 OPTIONAL INT32 time_milli (TIME_MILLIS);
1542 OPTIONAL INT64 time_micro (TIME_MICROS);
1543 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1544 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1545 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1546 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1547 REPEATED INT32 int_list;
1548 REPEATED BINARY byte_list;
1549 REPEATED BINARY string_list (UTF8);
1550 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1551 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1552 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1553 }
1554 ";
1555 let parquet_group_type = parse_message_type(message_type).unwrap();
1556
1557 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1558 let converted_arrow_fields = parquet_schema
1559 .columns()
1560 .iter()
1561 .map(|c| parquet_to_arrow_field(c).unwrap())
1562 .collect::<Vec<Field>>();
1563
1564 let arrow_fields = vec![
1565 Field::new("boolean", DataType::Boolean, false),
1566 Field::new("int8", DataType::Int8, false),
1567 Field::new("uint8", DataType::UInt8, false),
1568 Field::new("int16", DataType::Int16, false),
1569 Field::new("uint16", DataType::UInt16, false),
1570 Field::new("int32", DataType::Int32, false),
1571 Field::new("int64", DataType::Int64, false),
1572 Field::new("double", DataType::Float64, true),
1573 Field::new("float", DataType::Float32, true),
1574 Field::new("float16", DataType::Float16, true),
1575 Field::new("string", DataType::Utf8, true),
1576 Field::new_list(
1577 "bools",
1578 Field::new("bools", DataType::Boolean, false),
1579 false,
1580 ),
1581 Field::new("date", DataType::Date32, true),
1582 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1583 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1584 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1585 Field::new(
1586 "ts_milli",
1587 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1588 true,
1589 ),
1590 Field::new(
1591 "ts_micro",
1592 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1593 false,
1594 ),
1595 Field::new(
1596 "ts_nano",
1597 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1598 false,
1599 ),
1600 Field::new_list(
1601 "int_list",
1602 Field::new("int_list", DataType::Int32, false),
1603 false,
1604 ),
1605 Field::new_list(
1606 "byte_list",
1607 Field::new("byte_list", DataType::Binary, false),
1608 false,
1609 ),
1610 Field::new_list(
1611 "string_list",
1612 Field::new("string_list", DataType::Utf8, false),
1613 false,
1614 ),
1615 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1616 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1617 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1618 ];
1619
1620 assert_eq!(arrow_fields, converted_arrow_fields);
1621 }
1622
1623 #[test]
1624 fn test_coerced_map_list() {
1625 let arrow_fields = vec![
1627 Field::new_list(
1628 "my_list",
1629 Field::new("item", DataType::Boolean, true),
1630 false,
1631 ),
1632 Field::new_map(
1633 "my_map",
1634 "entries",
1635 Field::new("keys", DataType::Utf8, false),
1636 Field::new("values", DataType::Int32, true),
1637 false,
1638 true,
1639 ),
1640 ];
1641 let arrow_schema = Schema::new(arrow_fields);
1642
1643 let message_type = "
1645 message parquet_schema {
1646 REQUIRED GROUP my_list (LIST) {
1647 REPEATED GROUP list {
1648 OPTIONAL BOOLEAN element;
1649 }
1650 }
1651 OPTIONAL GROUP my_map (MAP) {
1652 REPEATED GROUP key_value {
1653 REQUIRED BINARY key (STRING);
1654 OPTIONAL INT32 value;
1655 }
1656 }
1657 }
1658 ";
1659 let parquet_group_type = parse_message_type(message_type).unwrap();
1660 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1661 let converted_arrow_schema = ArrowSchemaConverter::new()
1662 .with_coerce_types(true)
1663 .convert(&arrow_schema)
1664 .unwrap();
1665 assert_eq!(
1666 parquet_schema.columns().len(),
1667 converted_arrow_schema.columns().len()
1668 );
1669
1670 let message_type = "
1672 message parquet_schema {
1673 REQUIRED GROUP my_list (LIST) {
1674 REPEATED GROUP list {
1675 OPTIONAL BOOLEAN item;
1676 }
1677 }
1678 OPTIONAL GROUP my_map (MAP) {
1679 REPEATED GROUP entries {
1680 REQUIRED BINARY keys (STRING);
1681 OPTIONAL INT32 values;
1682 }
1683 }
1684 }
1685 ";
1686 let parquet_group_type = parse_message_type(message_type).unwrap();
1687 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1688 let converted_arrow_schema = ArrowSchemaConverter::new()
1689 .with_coerce_types(false)
1690 .convert(&arrow_schema)
1691 .unwrap();
1692 assert_eq!(
1693 parquet_schema.columns().len(),
1694 converted_arrow_schema.columns().len()
1695 );
1696 }
1697
1698 #[test]
1699 fn test_field_to_column_desc() {
1700 let message_type = "
1701 message arrow_schema {
1702 REQUIRED BOOLEAN boolean;
1703 REQUIRED INT32 int8 (INT_8);
1704 REQUIRED INT32 int16 (INTEGER(16,true));
1705 REQUIRED INT32 int32;
1706 REQUIRED INT64 int64;
1707 OPTIONAL DOUBLE double;
1708 OPTIONAL FLOAT float;
1709 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1710 OPTIONAL BINARY string (STRING);
1711 OPTIONAL GROUP bools (LIST) {
1712 REPEATED GROUP list {
1713 OPTIONAL BOOLEAN element;
1714 }
1715 }
1716 REQUIRED GROUP bools_non_null (LIST) {
1717 REPEATED GROUP list {
1718 REQUIRED BOOLEAN element;
1719 }
1720 }
1721 OPTIONAL INT32 date (DATE);
1722 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1723 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1724 OPTIONAL INT64 time_micro (TIME_MICROS);
1725 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1726 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1727 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1728 REQUIRED INT64 ts_seconds;
1729 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1730 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1731 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1732 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1733 REQUIRED GROUP struct {
1734 REQUIRED BOOLEAN bools;
1735 REQUIRED INT32 uint32 (INTEGER(32,false));
1736 REQUIRED GROUP int32 (LIST) {
1737 REPEATED GROUP list {
1738 OPTIONAL INT32 element;
1739 }
1740 }
1741 }
1742 REQUIRED BINARY dictionary_strings (STRING);
1743 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1744 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1745 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1746 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1747 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1748 }
1749 ";
1750 let parquet_group_type = parse_message_type(message_type).unwrap();
1751
1752 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1753
1754 let arrow_fields = vec![
1755 Field::new("boolean", DataType::Boolean, false),
1756 Field::new("int8", DataType::Int8, false),
1757 Field::new("int16", DataType::Int16, false),
1758 Field::new("int32", DataType::Int32, false),
1759 Field::new("int64", DataType::Int64, false),
1760 Field::new("double", DataType::Float64, true),
1761 Field::new("float", DataType::Float32, true),
1762 Field::new("float16", DataType::Float16, true),
1763 Field::new("string", DataType::Utf8, true),
1764 Field::new_list(
1765 "bools",
1766 Field::new("element", DataType::Boolean, true),
1767 true,
1768 ),
1769 Field::new_list(
1770 "bools_non_null",
1771 Field::new("element", DataType::Boolean, false),
1772 false,
1773 ),
1774 Field::new("date", DataType::Date32, true),
1775 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1776 Field::new(
1777 "time_milli_utc",
1778 DataType::Time32(TimeUnit::Millisecond),
1779 true,
1780 )
1781 .with_metadata(HashMap::from_iter(vec![(
1782 "adjusted_to_utc".to_string(),
1783 "".to_string(),
1784 )])),
1785 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1786 Field::new(
1787 "time_micro_utc",
1788 DataType::Time64(TimeUnit::Microsecond),
1789 true,
1790 )
1791 .with_metadata(HashMap::from_iter(vec![(
1792 "adjusted_to_utc".to_string(),
1793 "".to_string(),
1794 )])),
1795 Field::new(
1796 "ts_milli",
1797 DataType::Timestamp(TimeUnit::Millisecond, None),
1798 true,
1799 ),
1800 Field::new(
1801 "ts_micro",
1802 DataType::Timestamp(TimeUnit::Microsecond, None),
1803 false,
1804 ),
1805 Field::new(
1806 "ts_seconds",
1807 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1808 false,
1809 ),
1810 Field::new(
1811 "ts_micro_utc",
1812 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1813 false,
1814 ),
1815 Field::new(
1816 "ts_millis_zero_offset",
1817 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1818 false,
1819 ),
1820 Field::new(
1821 "ts_millis_zero_negative_offset",
1822 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1823 false,
1824 ),
1825 Field::new(
1826 "ts_micro_non_utc",
1827 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1828 false,
1829 ),
1830 Field::new_struct(
1831 "struct",
1832 vec![
1833 Field::new("bools", DataType::Boolean, false),
1834 Field::new("uint32", DataType::UInt32, false),
1835 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1836 ],
1837 false,
1838 ),
1839 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1840 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1841 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1842 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1843 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1844 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1845 ];
1846 let arrow_schema = Schema::new(arrow_fields);
1847 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1848
1849 assert_eq!(
1850 parquet_schema.columns().len(),
1851 converted_arrow_schema.columns().len()
1852 );
1853 parquet_schema
1854 .columns()
1855 .iter()
1856 .zip(converted_arrow_schema.columns())
1857 .for_each(|(a, b)| {
1858 match a.logical_type() {
1863 Some(_) => {
1864 assert_eq!(a, b)
1865 }
1866 None => {
1867 assert_eq!(a.name(), b.name());
1868 assert_eq!(a.physical_type(), b.physical_type());
1869 assert_eq!(a.converted_type(), b.converted_type());
1870 }
1871 };
1872 });
1873 }
1874
1875 #[test]
1876 #[should_panic(expected = "Parquet does not support writing empty structs")]
1877 fn test_empty_struct_field() {
1878 let arrow_fields = vec![Field::new(
1879 "struct",
1880 DataType::Struct(Fields::empty()),
1881 false,
1882 )];
1883 let arrow_schema = Schema::new(arrow_fields);
1884 let converted_arrow_schema = ArrowSchemaConverter::new()
1885 .with_coerce_types(true)
1886 .convert(&arrow_schema);
1887
1888 converted_arrow_schema.unwrap();
1889 }
1890
1891 #[test]
1892 fn test_metadata() {
1893 let message_type = "
1894 message test_schema {
1895 OPTIONAL BINARY string (STRING);
1896 }
1897 ";
1898 let parquet_group_type = parse_message_type(message_type).unwrap();
1899
1900 let key_value_metadata = vec![
1901 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1902 KeyValue::new("baz".to_owned(), None),
1903 ];
1904
1905 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1906 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1907
1908 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1909 let converted_arrow_schema =
1910 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1911
1912 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1913 }
1914
1915 #[test]
1916 fn test_arrow_schema_roundtrip() -> Result<()> {
1917 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1918 a.iter()
1919 .map(|(a, b)| (a.to_string(), b.to_string()))
1920 .collect()
1921 };
1922
1923 let schema = Schema::new_with_metadata(
1924 vec![
1925 Field::new("c1", DataType::Utf8, false)
1926 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1927 Field::new("c2", DataType::Binary, false),
1928 Field::new("c3", DataType::FixedSizeBinary(3), false),
1929 Field::new("c4", DataType::Boolean, false),
1930 Field::new("c5", DataType::Date32, false),
1931 Field::new("c6", DataType::Date64, false),
1932 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1933 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1934 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1935 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1936 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1937 Field::new(
1938 "c16",
1939 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1940 false,
1941 ),
1942 Field::new(
1943 "c17",
1944 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1945 false,
1946 ),
1947 Field::new(
1948 "c18",
1949 DataType::Timestamp(TimeUnit::Nanosecond, None),
1950 false,
1951 ),
1952 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1953 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1954 Field::new_list(
1955 "c21",
1956 Field::new_list_field(DataType::Boolean, true)
1957 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1958 false,
1959 )
1960 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1961 Field::new(
1962 "c22",
1963 DataType::FixedSizeList(
1964 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1965 5,
1966 ),
1967 false,
1968 ),
1969 Field::new_list(
1970 "c23",
1971 Field::new_large_list(
1972 "inner",
1973 Field::new_list_field(
1974 DataType::Struct(
1975 vec![
1976 Field::new("a", DataType::Int16, true),
1977 Field::new("b", DataType::Float64, false),
1978 Field::new("c", DataType::Float32, false),
1979 Field::new("d", DataType::Float16, false),
1980 ]
1981 .into(),
1982 ),
1983 false,
1984 ),
1985 true,
1986 ),
1987 false,
1988 ),
1989 Field::new(
1990 "c24",
1991 DataType::Struct(Fields::from(vec![
1992 Field::new("a", DataType::Utf8, false),
1993 Field::new("b", DataType::UInt16, false),
1994 ])),
1995 false,
1996 ),
1997 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1998 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1999 #[allow(deprecated)]
2005 Field::new_dict(
2006 "c31",
2007 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2008 true,
2009 123,
2010 true,
2011 )
2012 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2013 Field::new("c32", DataType::LargeBinary, true),
2014 Field::new("c33", DataType::LargeUtf8, true),
2015 Field::new_large_list(
2016 "c34",
2017 Field::new_list(
2018 "inner",
2019 Field::new_list_field(
2020 DataType::Struct(
2021 vec![
2022 Field::new("a", DataType::Int16, true),
2023 Field::new("b", DataType::Float64, true),
2024 ]
2025 .into(),
2026 ),
2027 true,
2028 ),
2029 true,
2030 ),
2031 true,
2032 ),
2033 Field::new("c35", DataType::Null, true),
2034 Field::new("c36", DataType::Decimal128(2, 1), false),
2035 Field::new("c37", DataType::Decimal256(50, 20), false),
2036 Field::new("c38", DataType::Decimal128(18, 12), true),
2037 Field::new_map(
2038 "c39",
2039 "key_value",
2040 Field::new("key", DataType::Utf8, false),
2041 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2042 false, true,
2044 ),
2045 Field::new_map(
2046 "c40",
2047 "my_entries",
2048 Field::new("my_key", DataType::Utf8, false)
2049 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2050 Field::new_list(
2051 "my_value",
2052 Field::new_list_field(DataType::Utf8, true)
2053 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2054 true,
2055 )
2056 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2057 false, true,
2059 )
2060 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2061 Field::new_map(
2062 "c41",
2063 "my_entries",
2064 Field::new("my_key", DataType::Utf8, false),
2065 Field::new_list(
2066 "my_value",
2067 Field::new_list_field(DataType::Utf8, true)
2068 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2069 true,
2070 ),
2071 false, false,
2073 ),
2074 Field::new("c42", DataType::Decimal32(5, 2), false),
2075 Field::new("c43", DataType::Decimal64(18, 12), true),
2076 ],
2077 meta(&[("Key", "Value")]),
2078 );
2079
2080 let file = tempfile::tempfile().unwrap();
2082 let writer =
2083 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2084 writer.close()?;
2085
2086 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2088
2089 let read_schema = arrow_reader.schema();
2091 assert_eq!(&schema, read_schema.as_ref());
2092
2093 let mut stack = Vec::with_capacity(10);
2095 let mut out = Vec::with_capacity(10);
2096
2097 let root = arrow_reader.parquet_schema().root_schema_ptr();
2098 stack.push((root.name().to_string(), root));
2099
2100 while let Some((p, t)) = stack.pop() {
2101 if t.is_group() {
2102 for f in t.get_fields() {
2103 stack.push((format!("{p}.{}", f.name()), f.clone()))
2104 }
2105 }
2106
2107 let info = t.get_basic_info();
2108 if info.has_id() {
2109 out.push(format!("{p} -> {}", info.id()))
2110 }
2111 }
2112 out.sort_unstable();
2113 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2114
2115 assert_eq!(
2116 &out,
2117 &[
2118 "arrow_schema.c1 -> 2",
2119 "arrow_schema.c21 -> 4",
2120 "arrow_schema.c21.list.item -> 5",
2121 "arrow_schema.c31 -> 6",
2122 "arrow_schema.c40 -> 7",
2123 "arrow_schema.c40.my_entries.my_key -> 8",
2124 "arrow_schema.c40.my_entries.my_value -> 9",
2125 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2126 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2127 ]
2128 );
2129
2130 Ok(())
2131 }
2132
2133 #[test]
2134 fn test_read_parquet_field_ids_raw() -> Result<()> {
2135 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2136 a.iter()
2137 .map(|(a, b)| (a.to_string(), b.to_string()))
2138 .collect()
2139 };
2140 let schema = Schema::new_with_metadata(
2141 vec![
2142 Field::new("c1", DataType::Utf8, true)
2143 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2144 Field::new("c2", DataType::Utf8, true)
2145 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2146 ],
2147 HashMap::new(),
2148 );
2149
2150 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2151 let parquet_bytes = writer.into_inner()?;
2152
2153 let reader =
2154 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2155 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2156
2157 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2159
2160 let parq_schema_descr = ArrowSchemaConverter::new()
2161 .with_coerce_types(true)
2162 .convert(&arrow_schema)?;
2163 let parq_fields = parq_schema_descr.root_schema().get_fields();
2164 assert_eq!(parq_fields.len(), 2);
2165 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2166 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2167
2168 Ok(())
2169 }
2170
2171 #[test]
2172 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2173 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2174 .iter()
2175 .cloned()
2176 .collect();
2177
2178 let schema = Schema::new_with_metadata(
2179 vec![
2180 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2181 Field::new(
2182 "c22",
2183 DataType::FixedSizeList(
2184 Arc::new(Field::new("items", DataType::Boolean, false)),
2185 5,
2186 ),
2187 false,
2188 ),
2189 Field::new_list(
2190 "c23",
2191 Field::new_large_list(
2192 "items",
2193 Field::new_struct(
2194 "items",
2195 vec![
2196 Field::new("a", DataType::Int16, true),
2197 Field::new("b", DataType::Float64, false),
2198 ],
2199 true,
2200 ),
2201 true,
2202 ),
2203 true,
2204 ),
2205 ],
2206 metadata,
2207 );
2208
2209 let file = tempfile::tempfile().unwrap();
2211 let writer =
2212 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2213 writer.close()?;
2214
2215 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2217 let read_schema = arrow_reader.schema();
2218 assert_eq!(&schema, read_schema.as_ref());
2219 Ok(())
2220 }
2221
2222 #[test]
2223 fn test_get_arrow_schema_from_metadata() {
2224 assert!(get_arrow_schema_from_metadata("").is_err());
2225 }
2226
2227 #[test]
2228 #[cfg(feature = "arrow_canonical_extension_types")]
2229 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2230 let arrow_schema = Schema::new(vec![Field::new(
2231 "uuid",
2232 DataType::FixedSizeBinary(16),
2233 false,
2234 )
2235 .with_extension_type(Uuid)]);
2236
2237 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239 assert_eq!(
2240 parquet_schema.column(0).logical_type(),
2241 Some(LogicalType::Uuid)
2242 );
2243
2244 Ok(())
2249 }
2250
2251 #[test]
2252 #[cfg(feature = "arrow_canonical_extension_types")]
2253 fn arrow_json_to_parquet_json() -> Result<()> {
2254 let arrow_schema = Schema::new(vec![
2255 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2256 ]);
2257
2258 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2259
2260 assert_eq!(
2261 parquet_schema.column(0).logical_type(),
2262 Some(LogicalType::Json)
2263 );
2264
2265 Ok(())
2274 }
2275}