1use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45pub fn parquet_to_arrow_schema(
50 parquet_schema: &SchemaDescriptor,
51 key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56pub fn parquet_to_arrow_schema_by_columns(
59 parquet_schema: &SchemaDescriptor,
60 mask: ProjectionMask,
61 key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66pub(crate) fn parquet_to_arrow_schema_and_fields(
68 parquet_schema: &SchemaDescriptor,
69 mask: ProjectionMask,
70 key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73 let maybe_schema = metadata
74 .remove(super::ARROW_SCHEMA_META_KEY)
75 .map(|value| get_arrow_schema_from_metadata(&value))
76 .transpose()?;
77
78 if let Some(arrow_schema) = &maybe_schema {
80 arrow_schema.metadata().iter().for_each(|(k, v)| {
81 metadata.entry(k.clone()).or_insert_with(|| v.clone());
82 });
83 }
84
85 let hint = maybe_schema.as_ref().map(|s| s.fields());
86 let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88 Ok((schema, field_levels.levels))
89}
90
91#[derive(Debug, Clone)]
99pub struct FieldLevels {
100 pub(crate) fields: Fields,
101 pub(crate) levels: Option<ParquetField>,
102}
103
104pub fn parquet_to_arrow_field_levels(
119 schema: &SchemaDescriptor,
120 mask: ProjectionMask,
121 hint: Option<&Fields>,
122) -> Result<FieldLevels> {
123 match complex::convert_schema(schema, mask, hint)? {
124 Some(field) => match &field.arrow_type {
125 DataType::Struct(fields) => Ok(FieldLevels {
126 fields: fields.clone(),
127 levels: Some(field),
128 }),
129 _ => unreachable!(),
130 },
131 None => Ok(FieldLevels {
132 fields: Fields::empty(),
133 levels: None,
134 }),
135 }
136}
137
138fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
140 let decoded = BASE64_STANDARD.decode(encoded_meta);
141 match decoded {
142 Ok(bytes) => {
143 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
144 &bytes[8..]
145 } else {
146 bytes.as_slice()
147 };
148 match arrow_ipc::root_as_message(slice) {
149 Ok(message) => message
150 .header_as_schema()
151 .map(arrow_ipc::convert::fb_to_schema)
152 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
153 Err(err) => {
154 Err(arrow_err!(
156 "Unable to get root as message stored in {}: {:?}",
157 super::ARROW_SCHEMA_META_KEY,
158 err
159 ))
160 }
161 }
162 }
163 Err(err) => {
164 Err(arrow_err!(
166 "Unable to decode the encoded schema stored in {}, {:?}",
167 super::ARROW_SCHEMA_META_KEY,
168 err
169 ))
170 }
171 }
172}
173
174pub fn encode_arrow_schema(schema: &Schema) -> String {
176 let options = writer::IpcWriteOptions::default();
177 #[allow(deprecated)]
178 let mut dictionary_tracker =
179 writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
180 let data_gen = writer::IpcDataGenerator::default();
181 let mut serialized_schema =
182 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
183
184 let schema_len = serialized_schema.ipc_message.len();
187 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
188 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
189 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
190 len_prefix_schema.append(&mut serialized_schema.ipc_message);
191
192 BASE64_STANDARD.encode(&len_prefix_schema)
193}
194
195pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
198 let encoded = encode_arrow_schema(schema);
199
200 let schema_kv = KeyValue {
201 key: super::ARROW_SCHEMA_META_KEY.to_string(),
202 value: Some(encoded),
203 };
204
205 let meta = props
206 .key_value_metadata
207 .get_or_insert_with(Default::default);
208
209 let schema_meta = meta
211 .iter()
212 .enumerate()
213 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
214 match schema_meta {
215 Some((i, _)) => {
216 meta.remove(i);
217 meta.push(schema_kv);
218 }
219 None => {
220 meta.push(schema_kv);
221 }
222 }
223}
224
225#[derive(Debug)]
265pub struct ArrowSchemaConverter<'a> {
266 schema_root: &'a str,
268 coerce_types: bool,
272}
273
274impl Default for ArrowSchemaConverter<'_> {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280impl<'a> ArrowSchemaConverter<'a> {
281 pub fn new() -> Self {
283 Self {
284 schema_root: "arrow_schema",
285 coerce_types: false,
286 }
287 }
288
289 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
320 self.coerce_types = coerce_types;
321 self
322 }
323
324 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
326 self.schema_root = schema_root;
327 self
328 }
329
330 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
334 let fields = schema
335 .fields()
336 .iter()
337 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
338 .collect::<Result<_>>()?;
339 let group = Type::group_type_builder(self.schema_root)
340 .with_fields(fields)
341 .build()?;
342 Ok(SchemaDescriptor::new(Arc::new(group)))
343 }
344}
345
346#[deprecated(since = "54.0.0", note = "Use `ArrowSchemaConverter` instead")]
351pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
352 ArrowSchemaConverter::new().convert(schema)
353}
354
355fn parse_key_value_metadata(
356 key_value_metadata: Option<&Vec<KeyValue>>,
357) -> Option<HashMap<String, String>> {
358 match key_value_metadata {
359 Some(key_values) => {
360 let map: HashMap<String, String> = key_values
361 .iter()
362 .filter_map(|kv| {
363 kv.value
364 .as_ref()
365 .map(|value| (kv.key.clone(), value.clone()))
366 })
367 .collect();
368
369 if map.is_empty() {
370 None
371 } else {
372 Some(map)
373 }
374 }
375 None => None,
376 }
377}
378
379pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
381 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
382 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
383
384 let basic_info = parquet_column.self_type().get_basic_info();
385 let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
386 2
387 } else {
388 1
389 });
390 if basic_info.has_id() {
391 meta.insert(
392 PARQUET_FIELD_ID_META_KEY.to_string(),
393 basic_info.id().to_string(),
394 );
395 }
396 #[cfg(feature = "arrow_canonical_extension_types")]
397 if let Some(logical_type) = basic_info.logical_type() {
398 match logical_type {
399 LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
400 LogicalType::Json => ret.try_with_extension_type(Json::default())?,
401 _ => {}
402 }
403 }
404 if !meta.is_empty() {
405 ret.set_metadata(meta);
406 }
407
408 Ok(ret)
409}
410
411pub fn decimal_length_from_precision(precision: u8) -> usize {
412 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
420}
421
422fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
424 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
425 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
426 const PARQUET_KEY_FIELD_NAME: &str = "key";
427 const PARQUET_VALUE_FIELD_NAME: &str = "value";
428
429 let name = field.name().as_str();
430 let repetition = if field.is_nullable() {
431 Repetition::OPTIONAL
432 } else {
433 Repetition::REQUIRED
434 };
435 let id = field_id(field);
436 match field.data_type() {
438 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
439 .with_logical_type(Some(LogicalType::Unknown))
440 .with_repetition(repetition)
441 .with_id(id)
442 .build(),
443 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
444 .with_repetition(repetition)
445 .with_id(id)
446 .build(),
447 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
448 .with_logical_type(Some(LogicalType::Integer {
449 bit_width: 8,
450 is_signed: true,
451 }))
452 .with_repetition(repetition)
453 .with_id(id)
454 .build(),
455 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
456 .with_logical_type(Some(LogicalType::Integer {
457 bit_width: 16,
458 is_signed: true,
459 }))
460 .with_repetition(repetition)
461 .with_id(id)
462 .build(),
463 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
464 .with_repetition(repetition)
465 .with_id(id)
466 .build(),
467 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
468 .with_repetition(repetition)
469 .with_id(id)
470 .build(),
471 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
472 .with_logical_type(Some(LogicalType::Integer {
473 bit_width: 8,
474 is_signed: false,
475 }))
476 .with_repetition(repetition)
477 .with_id(id)
478 .build(),
479 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
480 .with_logical_type(Some(LogicalType::Integer {
481 bit_width: 16,
482 is_signed: false,
483 }))
484 .with_repetition(repetition)
485 .with_id(id)
486 .build(),
487 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
488 .with_logical_type(Some(LogicalType::Integer {
489 bit_width: 32,
490 is_signed: false,
491 }))
492 .with_repetition(repetition)
493 .with_id(id)
494 .build(),
495 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
496 .with_logical_type(Some(LogicalType::Integer {
497 bit_width: 64,
498 is_signed: false,
499 }))
500 .with_repetition(repetition)
501 .with_id(id)
502 .build(),
503 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
504 .with_repetition(repetition)
505 .with_id(id)
506 .with_logical_type(Some(LogicalType::Float16))
507 .with_length(2)
508 .build(),
509 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
510 .with_repetition(repetition)
511 .with_id(id)
512 .build(),
513 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
514 .with_repetition(repetition)
515 .with_id(id)
516 .build(),
517 DataType::Timestamp(TimeUnit::Second, _) => {
518 Type::primitive_type_builder(name, PhysicalType::INT64)
520 .with_repetition(repetition)
521 .with_id(id)
522 .build()
523 }
524 DataType::Timestamp(time_unit, tz) => {
525 Type::primitive_type_builder(name, PhysicalType::INT64)
526 .with_logical_type(Some(LogicalType::Timestamp {
527 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
529 unit: match time_unit {
530 TimeUnit::Second => unreachable!(),
531 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
532 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
533 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
534 },
535 }))
536 .with_repetition(repetition)
537 .with_id(id)
538 .build()
539 }
540 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
541 .with_logical_type(Some(LogicalType::Date))
542 .with_repetition(repetition)
543 .with_id(id)
544 .build(),
545 DataType::Date64 => {
546 if coerce_types {
547 Type::primitive_type_builder(name, PhysicalType::INT32)
548 .with_logical_type(Some(LogicalType::Date))
549 .with_repetition(repetition)
550 .with_id(id)
551 .build()
552 } else {
553 Type::primitive_type_builder(name, PhysicalType::INT64)
554 .with_repetition(repetition)
555 .with_id(id)
556 .build()
557 }
558 }
559 DataType::Time32(TimeUnit::Second) => {
560 Type::primitive_type_builder(name, PhysicalType::INT32)
562 .with_repetition(repetition)
563 .with_id(id)
564 .build()
565 }
566 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
567 .with_logical_type(Some(LogicalType::Time {
568 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
569 unit: match unit {
570 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
571 u => unreachable!("Invalid unit for Time32: {:?}", u),
572 },
573 }))
574 .with_repetition(repetition)
575 .with_id(id)
576 .build(),
577 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
578 .with_logical_type(Some(LogicalType::Time {
579 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
580 unit: match unit {
581 TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
582 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
583 u => unreachable!("Invalid unit for Time64: {:?}", u),
584 },
585 }))
586 .with_repetition(repetition)
587 .with_id(id)
588 .build(),
589 DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)),
590 DataType::Interval(_) => {
591 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
592 .with_converted_type(ConvertedType::INTERVAL)
593 .with_repetition(repetition)
594 .with_id(id)
595 .with_length(12)
596 .build()
597 }
598 DataType::Binary | DataType::LargeBinary => {
599 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
600 .with_repetition(repetition)
601 .with_id(id)
602 .build()
603 }
604 DataType::FixedSizeBinary(length) => {
605 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
606 .with_repetition(repetition)
607 .with_id(id)
608 .with_length(*length)
609 .with_logical_type(
610 #[cfg(feature = "arrow_canonical_extension_types")]
611 field
613 .try_extension_type::<Uuid>()
614 .ok()
615 .map(|_| LogicalType::Uuid),
616 #[cfg(not(feature = "arrow_canonical_extension_types"))]
617 None,
618 )
619 .build()
620 }
621 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
622 .with_repetition(repetition)
623 .with_id(id)
624 .build(),
625 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
626 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
629 (PhysicalType::INT32, -1)
630 } else if *precision <= 18 {
631 (PhysicalType::INT64, -1)
632 } else {
633 (
634 PhysicalType::FIXED_LEN_BYTE_ARRAY,
635 decimal_length_from_precision(*precision) as i32,
636 )
637 };
638 Type::primitive_type_builder(name, physical_type)
639 .with_repetition(repetition)
640 .with_id(id)
641 .with_length(length)
642 .with_logical_type(Some(LogicalType::Decimal {
643 scale: *scale as i32,
644 precision: *precision as i32,
645 }))
646 .with_precision(*precision as i32)
647 .with_scale(*scale as i32)
648 .build()
649 }
650 DataType::Utf8 | DataType::LargeUtf8 => {
651 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
652 .with_logical_type({
653 #[cfg(feature = "arrow_canonical_extension_types")]
654 {
655 field
658 .try_extension_type::<Json>()
659 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
660 }
661 #[cfg(not(feature = "arrow_canonical_extension_types"))]
662 Some(LogicalType::String)
663 })
664 .with_repetition(repetition)
665 .with_id(id)
666 .build()
667 }
668 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
669 .with_logical_type({
670 #[cfg(feature = "arrow_canonical_extension_types")]
671 {
672 field
675 .try_extension_type::<Json>()
676 .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
677 }
678 #[cfg(not(feature = "arrow_canonical_extension_types"))]
679 Some(LogicalType::String)
680 })
681 .with_repetition(repetition)
682 .with_id(id)
683 .build(),
684 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
685 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
686 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
688 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
689 } else {
690 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
691 };
692
693 Type::group_type_builder(name)
694 .with_fields(vec![Arc::new(
695 Type::group_type_builder("list")
696 .with_fields(vec![field_ref])
697 .with_repetition(Repetition::REPEATED)
698 .build()?,
699 )])
700 .with_logical_type(Some(LogicalType::List))
701 .with_repetition(repetition)
702 .with_id(id)
703 .build()
704 }
705 DataType::ListView(_) | DataType::LargeListView(_) => {
706 unimplemented!("ListView/LargeListView not implemented")
707 }
708 DataType::Struct(fields) => {
709 if fields.is_empty() {
710 return Err(arrow_err!("Parquet does not support writing empty structs",));
711 }
712 let fields = fields
714 .iter()
715 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
716 .collect::<Result<_>>()?;
717 Type::group_type_builder(name)
718 .with_fields(fields)
719 .with_repetition(repetition)
720 .with_id(id)
721 .build()
722 }
723 DataType::Map(field, _) => {
724 if let DataType::Struct(struct_fields) = field.data_type() {
725 let map_struct_name = if coerce_types {
727 PARQUET_MAP_STRUCT_NAME
728 } else {
729 field.name()
730 };
731
732 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
734 if coerce_types && fld.name() != name {
735 let f = fld.as_ref().clone().with_name(name);
736 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
737 } else {
738 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
739 }
740 };
741 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
742 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
743
744 Type::group_type_builder(name)
745 .with_fields(vec![Arc::new(
746 Type::group_type_builder(map_struct_name)
747 .with_fields(vec![key_field, val_field])
748 .with_repetition(Repetition::REPEATED)
749 .build()?,
750 )])
751 .with_logical_type(Some(LogicalType::Map))
752 .with_repetition(repetition)
753 .with_id(id)
754 .build()
755 } else {
756 Err(arrow_err!(
757 "DataType::Map should contain a struct field child",
758 ))
759 }
760 }
761 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
762 DataType::Dictionary(_, ref value) => {
763 let dict_field = field.clone().with_data_type(value.as_ref().clone());
765 arrow_to_parquet_type(&dict_field, coerce_types)
766 }
767 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
768 "Converting RunEndEncodedType to parquet not supported",
769 )),
770 }
771}
772
773fn field_id(field: &Field) -> Option<i32> {
774 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
775 value.parse().ok() }
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 use std::{collections::HashMap, sync::Arc};
783
784 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
785
786 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
787 use crate::file::metadata::KeyValue;
788 use crate::file::reader::FileReader;
789 use crate::{
790 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
791 schema::{parser::parse_message_type, types::SchemaDescriptor},
792 };
793
794 #[test]
795 fn test_flat_primitives() {
796 let message_type = "
797 message test_schema {
798 REQUIRED BOOLEAN boolean;
799 REQUIRED INT32 int8 (INT_8);
800 REQUIRED INT32 int16 (INT_16);
801 REQUIRED INT32 uint8 (INTEGER(8,false));
802 REQUIRED INT32 uint16 (INTEGER(16,false));
803 REQUIRED INT32 int32;
804 REQUIRED INT64 int64;
805 OPTIONAL DOUBLE double;
806 OPTIONAL FLOAT float;
807 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
808 OPTIONAL BINARY string (UTF8);
809 OPTIONAL BINARY string_2 (STRING);
810 OPTIONAL BINARY json (JSON);
811 }
812 ";
813 let parquet_group_type = parse_message_type(message_type).unwrap();
814
815 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
816 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
817
818 let arrow_fields = Fields::from(vec![
819 Field::new("boolean", DataType::Boolean, false),
820 Field::new("int8", DataType::Int8, false),
821 Field::new("int16", DataType::Int16, false),
822 Field::new("uint8", DataType::UInt8, false),
823 Field::new("uint16", DataType::UInt16, false),
824 Field::new("int32", DataType::Int32, false),
825 Field::new("int64", DataType::Int64, false),
826 Field::new("double", DataType::Float64, true),
827 Field::new("float", DataType::Float32, true),
828 Field::new("float16", DataType::Float16, true),
829 Field::new("string", DataType::Utf8, true),
830 Field::new("string_2", DataType::Utf8, true),
831 Field::new("json", DataType::Utf8, true),
832 ]);
833
834 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
835 }
836
837 #[test]
838 fn test_decimal_fields() {
839 let message_type = "
840 message test_schema {
841 REQUIRED INT32 decimal1 (DECIMAL(4,2));
842 REQUIRED INT64 decimal2 (DECIMAL(12,2));
843 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
844 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
845 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
846 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
847 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
848 }
849 ";
850
851 let parquet_group_type = parse_message_type(message_type).unwrap();
852
853 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
854 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
855
856 let arrow_fields = Fields::from(vec![
857 Field::new("decimal1", DataType::Decimal128(4, 2), false),
858 Field::new("decimal2", DataType::Decimal128(12, 2), false),
859 Field::new("decimal3", DataType::Decimal128(30, 2), false),
860 Field::new("decimal4", DataType::Decimal128(33, 2), false),
861 Field::new("decimal5", DataType::Decimal128(38, 2), false),
862 Field::new("decimal6", DataType::Decimal256(39, 2), false),
863 Field::new("decimal7", DataType::Decimal256(39, 2), false),
864 ]);
865 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
866 }
867
868 #[test]
869 fn test_byte_array_fields() {
870 let message_type = "
871 message test_schema {
872 REQUIRED BYTE_ARRAY binary;
873 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
874 }
875 ";
876
877 let parquet_group_type = parse_message_type(message_type).unwrap();
878
879 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
880 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
881
882 let arrow_fields = Fields::from(vec![
883 Field::new("binary", DataType::Binary, false),
884 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
885 ]);
886 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
887 }
888
889 #[test]
890 fn test_duplicate_fields() {
891 let message_type = "
892 message test_schema {
893 REQUIRED BOOLEAN boolean;
894 REQUIRED INT32 int8 (INT_8);
895 }
896 ";
897
898 let parquet_group_type = parse_message_type(message_type).unwrap();
899
900 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
901 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
902
903 let arrow_fields = Fields::from(vec![
904 Field::new("boolean", DataType::Boolean, false),
905 Field::new("int8", DataType::Int8, false),
906 ]);
907 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
908
909 let converted_arrow_schema =
910 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
911 .unwrap();
912 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
913 }
914
915 #[test]
916 fn test_parquet_lists() {
917 let mut arrow_fields = Vec::new();
918
919 let message_type = "
921 message test_schema {
922 REQUIRED GROUP my_list (LIST) {
923 REPEATED GROUP list {
924 OPTIONAL BINARY element (UTF8);
925 }
926 }
927 OPTIONAL GROUP my_list (LIST) {
928 REPEATED GROUP list {
929 REQUIRED BINARY element (UTF8);
930 }
931 }
932 OPTIONAL GROUP array_of_arrays (LIST) {
933 REPEATED GROUP list {
934 REQUIRED GROUP element (LIST) {
935 REPEATED GROUP list {
936 REQUIRED INT32 element;
937 }
938 }
939 }
940 }
941 OPTIONAL GROUP my_list (LIST) {
942 REPEATED GROUP element {
943 REQUIRED BINARY str (UTF8);
944 }
945 }
946 OPTIONAL GROUP my_list (LIST) {
947 REPEATED INT32 element;
948 }
949 OPTIONAL GROUP my_list (LIST) {
950 REPEATED GROUP element {
951 REQUIRED BINARY str (UTF8);
952 REQUIRED INT32 num;
953 }
954 }
955 OPTIONAL GROUP my_list (LIST) {
956 REPEATED GROUP array {
957 REQUIRED BINARY str (UTF8);
958 }
959
960 }
961 OPTIONAL GROUP my_list (LIST) {
962 REPEATED GROUP my_list_tuple {
963 REQUIRED BINARY str (UTF8);
964 }
965 }
966 REPEATED INT32 name;
967 }
968 ";
969
970 {
977 arrow_fields.push(Field::new_list(
978 "my_list",
979 Field::new("element", DataType::Utf8, true),
980 false,
981 ));
982 }
983
984 {
991 arrow_fields.push(Field::new_list(
992 "my_list",
993 Field::new("element", DataType::Utf8, false),
994 true,
995 ));
996 }
997
998 {
1011 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1012 arrow_fields.push(Field::new_list(
1013 "array_of_arrays",
1014 Field::new_list("element", arrow_inner_list, false),
1015 true,
1016 ));
1017 }
1018
1019 {
1026 arrow_fields.push(Field::new_list(
1027 "my_list",
1028 Field::new("str", DataType::Utf8, false),
1029 true,
1030 ));
1031 }
1032
1033 {
1038 arrow_fields.push(Field::new_list(
1039 "my_list",
1040 Field::new("element", DataType::Int32, false),
1041 true,
1042 ));
1043 }
1044
1045 {
1053 let fields = vec![
1054 Field::new("str", DataType::Utf8, false),
1055 Field::new("num", DataType::Int32, false),
1056 ];
1057 arrow_fields.push(Field::new_list(
1058 "my_list",
1059 Field::new_struct("element", fields, false),
1060 true,
1061 ));
1062 }
1063
1064 {
1072 let fields = vec![Field::new("str", DataType::Utf8, false)];
1073 arrow_fields.push(Field::new_list(
1074 "my_list",
1075 Field::new_struct("array", fields, false),
1076 true,
1077 ));
1078 }
1079
1080 {
1088 let fields = vec![Field::new("str", DataType::Utf8, false)];
1089 arrow_fields.push(Field::new_list(
1090 "my_list",
1091 Field::new_struct("my_list_tuple", fields, false),
1092 true,
1093 ));
1094 }
1095
1096 {
1099 arrow_fields.push(Field::new_list(
1100 "name",
1101 Field::new("name", DataType::Int32, false),
1102 false,
1103 ));
1104 }
1105
1106 let parquet_group_type = parse_message_type(message_type).unwrap();
1107
1108 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1109 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1110 let converted_fields = converted_arrow_schema.fields();
1111
1112 assert_eq!(arrow_fields.len(), converted_fields.len());
1113 for i in 0..arrow_fields.len() {
1114 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1115 }
1116 }
1117
1118 #[test]
1119 fn test_parquet_list_nullable() {
1120 let mut arrow_fields = Vec::new();
1121
1122 let message_type = "
1123 message test_schema {
1124 REQUIRED GROUP my_list1 (LIST) {
1125 REPEATED GROUP list {
1126 OPTIONAL BINARY element (UTF8);
1127 }
1128 }
1129 OPTIONAL GROUP my_list2 (LIST) {
1130 REPEATED GROUP list {
1131 REQUIRED BINARY element (UTF8);
1132 }
1133 }
1134 REQUIRED GROUP my_list3 (LIST) {
1135 REPEATED GROUP list {
1136 REQUIRED BINARY element (UTF8);
1137 }
1138 }
1139 }
1140 ";
1141
1142 {
1149 arrow_fields.push(Field::new_list(
1150 "my_list1",
1151 Field::new("element", DataType::Utf8, true),
1152 false,
1153 ));
1154 }
1155
1156 {
1163 arrow_fields.push(Field::new_list(
1164 "my_list2",
1165 Field::new("element", DataType::Utf8, false),
1166 true,
1167 ));
1168 }
1169
1170 {
1177 arrow_fields.push(Field::new_list(
1178 "my_list3",
1179 Field::new("element", DataType::Utf8, false),
1180 false,
1181 ));
1182 }
1183
1184 let parquet_group_type = parse_message_type(message_type).unwrap();
1185
1186 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1187 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1188 let converted_fields = converted_arrow_schema.fields();
1189
1190 assert_eq!(arrow_fields.len(), converted_fields.len());
1191 for i in 0..arrow_fields.len() {
1192 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1193 }
1194 }
1195
1196 #[test]
1197 fn test_parquet_maps() {
1198 let mut arrow_fields = Vec::new();
1199
1200 let message_type = "
1202 message test_schema {
1203 REQUIRED group my_map1 (MAP) {
1204 REPEATED group key_value {
1205 REQUIRED binary key (UTF8);
1206 OPTIONAL int32 value;
1207 }
1208 }
1209 OPTIONAL group my_map2 (MAP) {
1210 REPEATED group map {
1211 REQUIRED binary str (UTF8);
1212 REQUIRED int32 num;
1213 }
1214 }
1215 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1216 REPEATED group map {
1217 REQUIRED binary key (UTF8);
1218 OPTIONAL int32 value;
1219 }
1220 }
1221 REQUIRED group my_map4 (MAP) {
1222 REPEATED group map {
1223 OPTIONAL binary key (UTF8);
1224 REQUIRED int32 value;
1225 }
1226 }
1227 }
1228 ";
1229
1230 {
1238 arrow_fields.push(Field::new_map(
1239 "my_map1",
1240 "key_value",
1241 Field::new("key", DataType::Utf8, false),
1242 Field::new("value", DataType::Int32, true),
1243 false,
1244 false,
1245 ));
1246 }
1247
1248 {
1256 arrow_fields.push(Field::new_map(
1257 "my_map2",
1258 "map",
1259 Field::new("str", DataType::Utf8, false),
1260 Field::new("num", DataType::Int32, false),
1261 false,
1262 true,
1263 ));
1264 }
1265
1266 {
1274 arrow_fields.push(Field::new_map(
1275 "my_map3",
1276 "map",
1277 Field::new("key", DataType::Utf8, false),
1278 Field::new("value", DataType::Int32, true),
1279 false,
1280 true,
1281 ));
1282 }
1283
1284 {
1292 arrow_fields.push(Field::new_map(
1293 "my_map4",
1294 "map",
1295 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1297 false,
1298 false,
1299 ));
1300 }
1301
1302 let parquet_group_type = parse_message_type(message_type).unwrap();
1303
1304 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1305 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1306 let converted_fields = converted_arrow_schema.fields();
1307
1308 assert_eq!(arrow_fields.len(), converted_fields.len());
1309 for i in 0..arrow_fields.len() {
1310 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1311 }
1312 }
1313
1314 #[test]
1315 fn test_nested_schema() {
1316 let mut arrow_fields = Vec::new();
1317 {
1318 let group1_fields = Fields::from(vec![
1319 Field::new("leaf1", DataType::Boolean, false),
1320 Field::new("leaf2", DataType::Int32, false),
1321 ]);
1322 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1323 arrow_fields.push(group1_struct);
1324
1325 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1326 arrow_fields.push(leaf3_field);
1327 }
1328
1329 let message_type = "
1330 message test_schema {
1331 REQUIRED GROUP group1 {
1332 REQUIRED BOOLEAN leaf1;
1333 REQUIRED INT32 leaf2;
1334 }
1335 REQUIRED INT64 leaf3;
1336 }
1337 ";
1338 let parquet_group_type = parse_message_type(message_type).unwrap();
1339
1340 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1341 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1342 let converted_fields = converted_arrow_schema.fields();
1343
1344 assert_eq!(arrow_fields.len(), converted_fields.len());
1345 for i in 0..arrow_fields.len() {
1346 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1347 }
1348 }
1349
1350 #[test]
1351 fn test_nested_schema_partial() {
1352 let mut arrow_fields = Vec::new();
1353 {
1354 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1355 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1356 arrow_fields.push(group1);
1357
1358 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1359 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1360 arrow_fields.push(group2);
1361
1362 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1363 }
1364
1365 let message_type = "
1366 message test_schema {
1367 REQUIRED GROUP group1 {
1368 REQUIRED INT64 leaf1;
1369 REQUIRED INT64 leaf2;
1370 }
1371 REQUIRED GROUP group2 {
1372 REQUIRED INT64 leaf3;
1373 REQUIRED INT64 leaf4;
1374 }
1375 REQUIRED INT64 leaf5;
1376 }
1377 ";
1378 let parquet_group_type = parse_message_type(message_type).unwrap();
1379
1380 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1390 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1391 let converted_arrow_schema =
1392 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1393 let converted_fields = converted_arrow_schema.fields();
1394
1395 assert_eq!(arrow_fields.len(), converted_fields.len());
1396 for i in 0..arrow_fields.len() {
1397 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1398 }
1399 }
1400
1401 #[test]
1402 fn test_nested_schema_partial_ordering() {
1403 let mut arrow_fields = Vec::new();
1404 {
1405 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1406 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1407 arrow_fields.push(group1);
1408
1409 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1410 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1411 arrow_fields.push(group2);
1412
1413 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1414 }
1415
1416 let message_type = "
1417 message test_schema {
1418 REQUIRED GROUP group1 {
1419 REQUIRED INT64 leaf1;
1420 REQUIRED INT64 leaf2;
1421 }
1422 REQUIRED GROUP group2 {
1423 REQUIRED INT64 leaf3;
1424 REQUIRED INT64 leaf4;
1425 }
1426 REQUIRED INT64 leaf5;
1427 }
1428 ";
1429 let parquet_group_type = parse_message_type(message_type).unwrap();
1430
1431 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1441 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1442 let converted_arrow_schema =
1443 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1444 let converted_fields = converted_arrow_schema.fields();
1445
1446 assert_eq!(arrow_fields.len(), converted_fields.len());
1447 for i in 0..arrow_fields.len() {
1448 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1449 }
1450
1451 let mask =
1452 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1453 let converted_arrow_schema =
1454 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1455 let converted_fields = converted_arrow_schema.fields();
1456
1457 assert_eq!(arrow_fields.len(), converted_fields.len());
1458 for i in 0..arrow_fields.len() {
1459 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1460 }
1461 }
1462
1463 #[test]
1464 fn test_repeated_nested_schema() {
1465 let mut arrow_fields = Vec::new();
1466 {
1467 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1468
1469 let inner_group_list = Field::new_list(
1470 "innerGroup",
1471 Field::new_struct(
1472 "innerGroup",
1473 vec![Field::new("leaf3", DataType::Int32, true)],
1474 false,
1475 ),
1476 false,
1477 );
1478
1479 let outer_group_list = Field::new_list(
1480 "outerGroup",
1481 Field::new_struct(
1482 "outerGroup",
1483 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1484 false,
1485 ),
1486 false,
1487 );
1488 arrow_fields.push(outer_group_list);
1489 }
1490
1491 let message_type = "
1492 message test_schema {
1493 OPTIONAL INT32 leaf1;
1494 REPEATED GROUP outerGroup {
1495 OPTIONAL INT32 leaf2;
1496 REPEATED GROUP innerGroup {
1497 OPTIONAL INT32 leaf3;
1498 }
1499 }
1500 }
1501 ";
1502 let parquet_group_type = parse_message_type(message_type).unwrap();
1503
1504 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1505 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1506 let converted_fields = converted_arrow_schema.fields();
1507
1508 assert_eq!(arrow_fields.len(), converted_fields.len());
1509 for i in 0..arrow_fields.len() {
1510 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1511 }
1512 }
1513
1514 #[test]
1515 fn test_column_desc_to_field() {
1516 let message_type = "
1517 message test_schema {
1518 REQUIRED BOOLEAN boolean;
1519 REQUIRED INT32 int8 (INT_8);
1520 REQUIRED INT32 uint8 (INTEGER(8,false));
1521 REQUIRED INT32 int16 (INT_16);
1522 REQUIRED INT32 uint16 (INTEGER(16,false));
1523 REQUIRED INT32 int32;
1524 REQUIRED INT64 int64;
1525 OPTIONAL DOUBLE double;
1526 OPTIONAL FLOAT float;
1527 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1528 OPTIONAL BINARY string (UTF8);
1529 REPEATED BOOLEAN bools;
1530 OPTIONAL INT32 date (DATE);
1531 OPTIONAL INT32 time_milli (TIME_MILLIS);
1532 OPTIONAL INT64 time_micro (TIME_MICROS);
1533 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1534 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1535 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1536 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1537 REPEATED INT32 int_list;
1538 REPEATED BINARY byte_list;
1539 REPEATED BINARY string_list (UTF8);
1540 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1541 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1542 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1543 }
1544 ";
1545 let parquet_group_type = parse_message_type(message_type).unwrap();
1546
1547 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1548 let converted_arrow_fields = parquet_schema
1549 .columns()
1550 .iter()
1551 .map(|c| parquet_to_arrow_field(c).unwrap())
1552 .collect::<Vec<Field>>();
1553
1554 let arrow_fields = vec![
1555 Field::new("boolean", DataType::Boolean, false),
1556 Field::new("int8", DataType::Int8, false),
1557 Field::new("uint8", DataType::UInt8, false),
1558 Field::new("int16", DataType::Int16, false),
1559 Field::new("uint16", DataType::UInt16, false),
1560 Field::new("int32", DataType::Int32, false),
1561 Field::new("int64", DataType::Int64, false),
1562 Field::new("double", DataType::Float64, true),
1563 Field::new("float", DataType::Float32, true),
1564 Field::new("float16", DataType::Float16, true),
1565 Field::new("string", DataType::Utf8, true),
1566 Field::new_list(
1567 "bools",
1568 Field::new("bools", DataType::Boolean, false),
1569 false,
1570 ),
1571 Field::new("date", DataType::Date32, true),
1572 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1573 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1574 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1575 Field::new(
1576 "ts_milli",
1577 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1578 true,
1579 ),
1580 Field::new(
1581 "ts_micro",
1582 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1583 false,
1584 ),
1585 Field::new(
1586 "ts_nano",
1587 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1588 false,
1589 ),
1590 Field::new_list(
1591 "int_list",
1592 Field::new("int_list", DataType::Int32, false),
1593 false,
1594 ),
1595 Field::new_list(
1596 "byte_list",
1597 Field::new("byte_list", DataType::Binary, false),
1598 false,
1599 ),
1600 Field::new_list(
1601 "string_list",
1602 Field::new("string_list", DataType::Utf8, false),
1603 false,
1604 ),
1605 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1606 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1607 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1608 ];
1609
1610 assert_eq!(arrow_fields, converted_arrow_fields);
1611 }
1612
1613 #[test]
1614 fn test_coerced_map_list() {
1615 let arrow_fields = vec![
1617 Field::new_list(
1618 "my_list",
1619 Field::new("item", DataType::Boolean, true),
1620 false,
1621 ),
1622 Field::new_map(
1623 "my_map",
1624 "entries",
1625 Field::new("keys", DataType::Utf8, false),
1626 Field::new("values", DataType::Int32, true),
1627 false,
1628 true,
1629 ),
1630 ];
1631 let arrow_schema = Schema::new(arrow_fields);
1632
1633 let message_type = "
1635 message parquet_schema {
1636 REQUIRED GROUP my_list (LIST) {
1637 REPEATED GROUP list {
1638 OPTIONAL BOOLEAN element;
1639 }
1640 }
1641 OPTIONAL GROUP my_map (MAP) {
1642 REPEATED GROUP key_value {
1643 REQUIRED BINARY key (STRING);
1644 OPTIONAL INT32 value;
1645 }
1646 }
1647 }
1648 ";
1649 let parquet_group_type = parse_message_type(message_type).unwrap();
1650 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1651 let converted_arrow_schema = ArrowSchemaConverter::new()
1652 .with_coerce_types(true)
1653 .convert(&arrow_schema)
1654 .unwrap();
1655 assert_eq!(
1656 parquet_schema.columns().len(),
1657 converted_arrow_schema.columns().len()
1658 );
1659
1660 let message_type = "
1662 message parquet_schema {
1663 REQUIRED GROUP my_list (LIST) {
1664 REPEATED GROUP list {
1665 OPTIONAL BOOLEAN item;
1666 }
1667 }
1668 OPTIONAL GROUP my_map (MAP) {
1669 REPEATED GROUP entries {
1670 REQUIRED BINARY keys (STRING);
1671 OPTIONAL INT32 values;
1672 }
1673 }
1674 }
1675 ";
1676 let parquet_group_type = parse_message_type(message_type).unwrap();
1677 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1678 let converted_arrow_schema = ArrowSchemaConverter::new()
1679 .with_coerce_types(false)
1680 .convert(&arrow_schema)
1681 .unwrap();
1682 assert_eq!(
1683 parquet_schema.columns().len(),
1684 converted_arrow_schema.columns().len()
1685 );
1686 }
1687
1688 #[test]
1689 fn test_field_to_column_desc() {
1690 let message_type = "
1691 message arrow_schema {
1692 REQUIRED BOOLEAN boolean;
1693 REQUIRED INT32 int8 (INT_8);
1694 REQUIRED INT32 int16 (INTEGER(16,true));
1695 REQUIRED INT32 int32;
1696 REQUIRED INT64 int64;
1697 OPTIONAL DOUBLE double;
1698 OPTIONAL FLOAT float;
1699 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1700 OPTIONAL BINARY string (STRING);
1701 OPTIONAL GROUP bools (LIST) {
1702 REPEATED GROUP list {
1703 OPTIONAL BOOLEAN element;
1704 }
1705 }
1706 REQUIRED GROUP bools_non_null (LIST) {
1707 REPEATED GROUP list {
1708 REQUIRED BOOLEAN element;
1709 }
1710 }
1711 OPTIONAL INT32 date (DATE);
1712 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1713 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1714 OPTIONAL INT64 time_micro (TIME_MICROS);
1715 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1716 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1717 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1718 REQUIRED INT64 ts_seconds;
1719 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1720 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1721 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1722 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1723 REQUIRED GROUP struct {
1724 REQUIRED BOOLEAN bools;
1725 REQUIRED INT32 uint32 (INTEGER(32,false));
1726 REQUIRED GROUP int32 (LIST) {
1727 REPEATED GROUP list {
1728 OPTIONAL INT32 element;
1729 }
1730 }
1731 }
1732 REQUIRED BINARY dictionary_strings (STRING);
1733 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1734 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1735 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1736 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1737 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1738 }
1739 ";
1740 let parquet_group_type = parse_message_type(message_type).unwrap();
1741
1742 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1743
1744 let arrow_fields = vec![
1745 Field::new("boolean", DataType::Boolean, false),
1746 Field::new("int8", DataType::Int8, false),
1747 Field::new("int16", DataType::Int16, false),
1748 Field::new("int32", DataType::Int32, false),
1749 Field::new("int64", DataType::Int64, false),
1750 Field::new("double", DataType::Float64, true),
1751 Field::new("float", DataType::Float32, true),
1752 Field::new("float16", DataType::Float16, true),
1753 Field::new("string", DataType::Utf8, true),
1754 Field::new_list(
1755 "bools",
1756 Field::new("element", DataType::Boolean, true),
1757 true,
1758 ),
1759 Field::new_list(
1760 "bools_non_null",
1761 Field::new("element", DataType::Boolean, false),
1762 false,
1763 ),
1764 Field::new("date", DataType::Date32, true),
1765 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1766 Field::new(
1767 "time_milli_utc",
1768 DataType::Time32(TimeUnit::Millisecond),
1769 true,
1770 )
1771 .with_metadata(HashMap::from_iter(vec![(
1772 "adjusted_to_utc".to_string(),
1773 "".to_string(),
1774 )])),
1775 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1776 Field::new(
1777 "time_micro_utc",
1778 DataType::Time64(TimeUnit::Microsecond),
1779 true,
1780 )
1781 .with_metadata(HashMap::from_iter(vec![(
1782 "adjusted_to_utc".to_string(),
1783 "".to_string(),
1784 )])),
1785 Field::new(
1786 "ts_milli",
1787 DataType::Timestamp(TimeUnit::Millisecond, None),
1788 true,
1789 ),
1790 Field::new(
1791 "ts_micro",
1792 DataType::Timestamp(TimeUnit::Microsecond, None),
1793 false,
1794 ),
1795 Field::new(
1796 "ts_seconds",
1797 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1798 false,
1799 ),
1800 Field::new(
1801 "ts_micro_utc",
1802 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1803 false,
1804 ),
1805 Field::new(
1806 "ts_millis_zero_offset",
1807 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1808 false,
1809 ),
1810 Field::new(
1811 "ts_millis_zero_negative_offset",
1812 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1813 false,
1814 ),
1815 Field::new(
1816 "ts_micro_non_utc",
1817 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1818 false,
1819 ),
1820 Field::new_struct(
1821 "struct",
1822 vec![
1823 Field::new("bools", DataType::Boolean, false),
1824 Field::new("uint32", DataType::UInt32, false),
1825 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1826 ],
1827 false,
1828 ),
1829 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1830 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1831 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1832 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1833 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1834 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1835 ];
1836 let arrow_schema = Schema::new(arrow_fields);
1837 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1838
1839 assert_eq!(
1840 parquet_schema.columns().len(),
1841 converted_arrow_schema.columns().len()
1842 );
1843 parquet_schema
1844 .columns()
1845 .iter()
1846 .zip(converted_arrow_schema.columns())
1847 .for_each(|(a, b)| {
1848 match a.logical_type() {
1853 Some(_) => {
1854 assert_eq!(a, b)
1855 }
1856 None => {
1857 assert_eq!(a.name(), b.name());
1858 assert_eq!(a.physical_type(), b.physical_type());
1859 assert_eq!(a.converted_type(), b.converted_type());
1860 }
1861 };
1862 });
1863 }
1864
1865 #[test]
1866 #[should_panic(expected = "Parquet does not support writing empty structs")]
1867 fn test_empty_struct_field() {
1868 let arrow_fields = vec![Field::new(
1869 "struct",
1870 DataType::Struct(Fields::empty()),
1871 false,
1872 )];
1873 let arrow_schema = Schema::new(arrow_fields);
1874 let converted_arrow_schema = ArrowSchemaConverter::new()
1875 .with_coerce_types(true)
1876 .convert(&arrow_schema);
1877
1878 converted_arrow_schema.unwrap();
1879 }
1880
1881 #[test]
1882 fn test_metadata() {
1883 let message_type = "
1884 message test_schema {
1885 OPTIONAL BINARY string (STRING);
1886 }
1887 ";
1888 let parquet_group_type = parse_message_type(message_type).unwrap();
1889
1890 let key_value_metadata = vec![
1891 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1892 KeyValue::new("baz".to_owned(), None),
1893 ];
1894
1895 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1896 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1897
1898 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1899 let converted_arrow_schema =
1900 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1901
1902 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1903 }
1904
1905 #[test]
1906 fn test_arrow_schema_roundtrip() -> Result<()> {
1907 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1908 a.iter()
1909 .map(|(a, b)| (a.to_string(), b.to_string()))
1910 .collect()
1911 };
1912
1913 let schema = Schema::new_with_metadata(
1914 vec![
1915 Field::new("c1", DataType::Utf8, false)
1916 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1917 Field::new("c2", DataType::Binary, false),
1918 Field::new("c3", DataType::FixedSizeBinary(3), false),
1919 Field::new("c4", DataType::Boolean, false),
1920 Field::new("c5", DataType::Date32, false),
1921 Field::new("c6", DataType::Date64, false),
1922 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1923 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1924 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1925 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1926 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1927 Field::new(
1928 "c16",
1929 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1930 false,
1931 ),
1932 Field::new(
1933 "c17",
1934 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1935 false,
1936 ),
1937 Field::new(
1938 "c18",
1939 DataType::Timestamp(TimeUnit::Nanosecond, None),
1940 false,
1941 ),
1942 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1943 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1944 Field::new_list(
1945 "c21",
1946 Field::new_list_field(DataType::Boolean, true)
1947 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1948 false,
1949 )
1950 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1951 Field::new(
1952 "c22",
1953 DataType::FixedSizeList(
1954 Arc::new(Field::new_list_field(DataType::Boolean, true)),
1955 5,
1956 ),
1957 false,
1958 ),
1959 Field::new_list(
1960 "c23",
1961 Field::new_large_list(
1962 "inner",
1963 Field::new_list_field(
1964 DataType::Struct(
1965 vec![
1966 Field::new("a", DataType::Int16, true),
1967 Field::new("b", DataType::Float64, false),
1968 Field::new("c", DataType::Float32, false),
1969 Field::new("d", DataType::Float16, false),
1970 ]
1971 .into(),
1972 ),
1973 false,
1974 ),
1975 true,
1976 ),
1977 false,
1978 ),
1979 Field::new(
1980 "c24",
1981 DataType::Struct(Fields::from(vec![
1982 Field::new("a", DataType::Utf8, false),
1983 Field::new("b", DataType::UInt16, false),
1984 ])),
1985 false,
1986 ),
1987 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1988 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1989 #[allow(deprecated)]
1995 Field::new_dict(
1996 "c31",
1997 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1998 true,
1999 123,
2000 true,
2001 )
2002 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2003 Field::new("c32", DataType::LargeBinary, true),
2004 Field::new("c33", DataType::LargeUtf8, true),
2005 Field::new_large_list(
2006 "c34",
2007 Field::new_list(
2008 "inner",
2009 Field::new_list_field(
2010 DataType::Struct(
2011 vec![
2012 Field::new("a", DataType::Int16, true),
2013 Field::new("b", DataType::Float64, true),
2014 ]
2015 .into(),
2016 ),
2017 true,
2018 ),
2019 true,
2020 ),
2021 true,
2022 ),
2023 Field::new("c35", DataType::Null, true),
2024 Field::new("c36", DataType::Decimal128(2, 1), false),
2025 Field::new("c37", DataType::Decimal256(50, 20), false),
2026 Field::new("c38", DataType::Decimal128(18, 12), true),
2027 Field::new_map(
2028 "c39",
2029 "key_value",
2030 Field::new("key", DataType::Utf8, false),
2031 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2032 false, true,
2034 ),
2035 Field::new_map(
2036 "c40",
2037 "my_entries",
2038 Field::new("my_key", DataType::Utf8, false)
2039 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2040 Field::new_list(
2041 "my_value",
2042 Field::new_list_field(DataType::Utf8, true)
2043 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2044 true,
2045 )
2046 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2047 false, true,
2049 )
2050 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2051 Field::new_map(
2052 "c41",
2053 "my_entries",
2054 Field::new("my_key", DataType::Utf8, false),
2055 Field::new_list(
2056 "my_value",
2057 Field::new_list_field(DataType::Utf8, true)
2058 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2059 true,
2060 ),
2061 false, false,
2063 ),
2064 ],
2065 meta(&[("Key", "Value")]),
2066 );
2067
2068 let file = tempfile::tempfile().unwrap();
2070 let writer =
2071 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2072 writer.close()?;
2073
2074 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2076
2077 let read_schema = arrow_reader.schema();
2079 assert_eq!(&schema, read_schema.as_ref());
2080
2081 let mut stack = Vec::with_capacity(10);
2083 let mut out = Vec::with_capacity(10);
2084
2085 let root = arrow_reader.parquet_schema().root_schema_ptr();
2086 stack.push((root.name().to_string(), root));
2087
2088 while let Some((p, t)) = stack.pop() {
2089 if t.is_group() {
2090 for f in t.get_fields() {
2091 stack.push((format!("{p}.{}", f.name()), f.clone()))
2092 }
2093 }
2094
2095 let info = t.get_basic_info();
2096 if info.has_id() {
2097 out.push(format!("{p} -> {}", info.id()))
2098 }
2099 }
2100 out.sort_unstable();
2101 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2102
2103 assert_eq!(
2104 &out,
2105 &[
2106 "arrow_schema.c1 -> 2",
2107 "arrow_schema.c21 -> 4",
2108 "arrow_schema.c21.list.item -> 5",
2109 "arrow_schema.c31 -> 6",
2110 "arrow_schema.c40 -> 7",
2111 "arrow_schema.c40.my_entries.my_key -> 8",
2112 "arrow_schema.c40.my_entries.my_value -> 9",
2113 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2114 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2115 ]
2116 );
2117
2118 Ok(())
2119 }
2120
2121 #[test]
2122 fn test_read_parquet_field_ids_raw() -> Result<()> {
2123 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2124 a.iter()
2125 .map(|(a, b)| (a.to_string(), b.to_string()))
2126 .collect()
2127 };
2128 let schema = Schema::new_with_metadata(
2129 vec![
2130 Field::new("c1", DataType::Utf8, true)
2131 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2132 Field::new("c2", DataType::Utf8, true)
2133 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2134 ],
2135 HashMap::new(),
2136 );
2137
2138 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2139 let parquet_bytes = writer.into_inner()?;
2140
2141 let reader =
2142 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2143 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2144
2145 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2147
2148 let parq_schema_descr = ArrowSchemaConverter::new()
2149 .with_coerce_types(true)
2150 .convert(&arrow_schema)?;
2151 let parq_fields = parq_schema_descr.root_schema().get_fields();
2152 assert_eq!(parq_fields.len(), 2);
2153 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2154 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2155
2156 Ok(())
2157 }
2158
2159 #[test]
2160 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2161 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2162 .iter()
2163 .cloned()
2164 .collect();
2165
2166 let schema = Schema::new_with_metadata(
2167 vec![
2168 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2169 Field::new(
2170 "c22",
2171 DataType::FixedSizeList(
2172 Arc::new(Field::new("items", DataType::Boolean, false)),
2173 5,
2174 ),
2175 false,
2176 ),
2177 Field::new_list(
2178 "c23",
2179 Field::new_large_list(
2180 "items",
2181 Field::new_struct(
2182 "items",
2183 vec![
2184 Field::new("a", DataType::Int16, true),
2185 Field::new("b", DataType::Float64, false),
2186 ],
2187 true,
2188 ),
2189 true,
2190 ),
2191 true,
2192 ),
2193 ],
2194 metadata,
2195 );
2196
2197 let file = tempfile::tempfile().unwrap();
2199 let writer =
2200 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2201 writer.close()?;
2202
2203 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2205 let read_schema = arrow_reader.schema();
2206 assert_eq!(&schema, read_schema.as_ref());
2207 Ok(())
2208 }
2209
2210 #[test]
2211 fn test_get_arrow_schema_from_metadata() {
2212 assert!(get_arrow_schema_from_metadata("").is_err());
2213 }
2214
2215 #[test]
2216 #[cfg(feature = "arrow_canonical_extension_types")]
2217 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2218 let arrow_schema = Schema::new(vec![Field::new(
2219 "uuid",
2220 DataType::FixedSizeBinary(16),
2221 false,
2222 )
2223 .with_extension_type(Uuid)]);
2224
2225 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2226
2227 assert_eq!(
2228 parquet_schema.column(0).logical_type(),
2229 Some(LogicalType::Uuid)
2230 );
2231
2232 Ok(())
2237 }
2238
2239 #[test]
2240 #[cfg(feature = "arrow_canonical_extension_types")]
2241 fn arrow_json_to_parquet_json() -> Result<()> {
2242 let arrow_schema = Schema::new(vec![
2243 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2244 ]);
2245
2246 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2247
2248 assert_eq!(
2249 parquet_schema.column(0).logical_type(),
2250 Some(LogicalType::Json)
2251 );
2252
2253 Ok(())
2262 }
2263}