1use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43 has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44 logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45 try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49pub fn parquet_to_arrow_schema(
54 parquet_schema: &SchemaDescriptor,
55 key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60pub fn parquet_to_arrow_schema_by_columns(
63 parquet_schema: &SchemaDescriptor,
64 mask: ProjectionMask,
65 key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70pub(crate) fn parquet_to_arrow_schema_and_fields(
76 parquet_schema: &SchemaDescriptor,
77 mask: ProjectionMask,
78 key_value_metadata: Option<&Vec<KeyValue>>,
79 virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82 let maybe_schema = metadata
83 .remove(super::ARROW_SCHEMA_META_KEY)
84 .map(|value| get_arrow_schema_from_metadata(&value))
85 .transpose()?;
86
87 if let Some(arrow_schema) = &maybe_schema {
89 arrow_schema.metadata().iter().for_each(|(k, v)| {
90 metadata.entry(k.clone()).or_insert_with(|| v.clone());
91 });
92 }
93
94 let hint = maybe_schema.as_ref().map(|s| s.fields());
95 let field_levels =
96 parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98 Ok((schema, field_levels.levels))
99}
100
101#[derive(Debug, Clone)]
109pub struct FieldLevels {
110 pub(crate) fields: Fields,
111 pub(crate) levels: Option<ParquetField>,
112}
113
114pub fn parquet_to_arrow_field_levels(
135 schema: &SchemaDescriptor,
136 mask: ProjectionMask,
137 hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139 parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142pub fn parquet_to_arrow_field_levels_with_virtual(
172 schema: &SchemaDescriptor,
173 mask: ProjectionMask,
174 hint: Option<&Fields>,
175 virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177 for field in virtual_columns {
179 if !virtual_type::is_virtual_column(field) {
180 return Err(ParquetError::General(format!(
181 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182 field.name()
183 )));
184 }
185 }
186
187 let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189 Some(field) => field,
190 None if virtual_columns.is_empty() => {
191 return Ok(FieldLevels {
192 fields: Fields::empty(),
193 levels: None,
194 });
195 }
196 None => {
197 ParquetField {
199 rep_level: 0,
200 def_level: 0,
201 nullable: false,
202 arrow_type: DataType::Struct(Fields::empty()),
203 field_type: ParquetFieldType::Group {
204 children: Vec::new(),
205 },
206 }
207 }
208 };
209
210 if !virtual_columns.is_empty() {
212 match &mut parquet_field.field_type {
213 ParquetFieldType::Group { children } => {
214 let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216 unreachable!("Root field must be a struct");
217 };
218
219 let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222 for virtual_column in virtual_columns {
224 assert_eq!(
226 parquet_field.rep_level, 0,
227 "Virtual columns can only be added at rep level 0"
228 );
229 assert_eq!(
230 parquet_field.def_level, 0,
231 "Virtual columns can only be added at def level 0"
232 );
233
234 fields_vec.push(virtual_column.clone());
235 let virtual_parquet_field = complex::convert_virtual_field(
236 virtual_column,
237 parquet_field.rep_level,
238 parquet_field.def_level,
239 )?;
240 children.push(virtual_parquet_field);
241 }
242
243 parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245 }
246 _ => unreachable!("Root field must be a group"),
247 }
248 }
249
250 match &parquet_field.arrow_type {
251 DataType::Struct(fields) => Ok(FieldLevels {
252 fields: fields.clone(),
253 levels: Some(parquet_field),
254 }),
255 _ => unreachable!(),
256 }
257}
258
259fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261 let decoded = BASE64_STANDARD.decode(encoded_meta);
262 match decoded {
263 Ok(bytes) => {
264 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265 &bytes[8..]
266 } else {
267 bytes.as_slice()
268 };
269 match arrow_ipc::root_as_message(slice) {
270 Ok(message) => message
271 .header_as_schema()
272 .map(arrow_ipc::convert::fb_to_schema)
273 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274 Err(err) => {
275 Err(arrow_err!(
277 "Unable to get root as message stored in {}: {:?}",
278 super::ARROW_SCHEMA_META_KEY,
279 err
280 ))
281 }
282 }
283 }
284 Err(err) => {
285 Err(arrow_err!(
287 "Unable to decode the encoded schema stored in {}, {:?}",
288 super::ARROW_SCHEMA_META_KEY,
289 err
290 ))
291 }
292 }
293}
294
295pub fn encode_arrow_schema(schema: &Schema) -> String {
297 let options = writer::IpcWriteOptions::default();
298 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299 let data_gen = writer::IpcDataGenerator::default();
300 let mut serialized_schema =
301 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303 let schema_len = serialized_schema.ipc_message.len();
306 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309 len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311 BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314fn flatten_ree_field(field: &Field) -> Field {
315 match field.data_type() {
316 DataType::RunEndEncoded(_, value_field) => field
317 .clone()
318 .with_data_type(value_field.data_type().clone()),
319 _ => field.clone(),
320 }
321}
322
323pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
330 let has_ree = schema
331 .fields()
332 .iter()
333 .any(|f| matches!(f.data_type(), DataType::RunEndEncoded(_, _)));
334 let flat_schema;
335 let schema = if has_ree {
336 let flat_fields: Vec<Field> = schema
337 .fields()
338 .iter()
339 .map(|f| flatten_ree_field(f))
340 .collect();
341 flat_schema = Schema::new_with_metadata(flat_fields, schema.metadata().clone());
342 &flat_schema
343 } else {
344 schema
345 };
346 let encoded = encode_arrow_schema(schema);
347
348 let schema_kv = KeyValue {
349 key: super::ARROW_SCHEMA_META_KEY.to_string(),
350 value: Some(encoded),
351 };
352
353 let meta = props
354 .key_value_metadata
355 .get_or_insert_with(Default::default);
356
357 let schema_meta = meta
359 .iter()
360 .enumerate()
361 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
362 match schema_meta {
363 Some((i, _)) => {
364 meta.remove(i);
365 meta.push(schema_kv);
366 }
367 None => {
368 meta.push(schema_kv);
369 }
370 }
371}
372
373#[derive(Debug)]
418pub struct ArrowSchemaConverter<'a> {
419 schema_root: &'a str,
421 coerce_types: bool,
425}
426
427impl Default for ArrowSchemaConverter<'_> {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433impl<'a> ArrowSchemaConverter<'a> {
434 pub fn new() -> Self {
436 Self {
437 schema_root: "arrow_schema",
438 coerce_types: false,
439 }
440 }
441
442 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
473 self.coerce_types = coerce_types;
474 self
475 }
476
477 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
479 self.schema_root = schema_root;
480 self
481 }
482
483 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
487 let fields = schema
488 .fields()
489 .iter()
490 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
491 .collect::<Result<_>>()?;
492 let group = Type::group_type_builder(self.schema_root)
493 .with_fields(fields)
494 .build()?;
495 Ok(SchemaDescriptor::new(Arc::new(group)))
496 }
497}
498
499fn parse_key_value_metadata(
500 key_value_metadata: Option<&Vec<KeyValue>>,
501) -> Option<HashMap<String, String>> {
502 match key_value_metadata {
503 Some(key_values) => {
504 let map: HashMap<String, String> = key_values
505 .iter()
506 .filter_map(|kv| {
507 kv.value
508 .as_ref()
509 .map(|value| (kv.key.clone(), value.clone()))
510 })
511 .collect();
512
513 if map.is_empty() { None } else { Some(map) }
514 }
515 None => None,
516 }
517}
518
519pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
521 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
522 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
523
524 let parquet_type = parquet_column.self_type();
525 let basic_info = parquet_type.get_basic_info();
526
527 let mut hash_map_size = 0;
528 if basic_info.has_id() {
529 hash_map_size += 1;
530 }
531 if has_extension_type(parquet_type) {
532 hash_map_size += 1;
533 }
534 if hash_map_size == 0 {
535 return Ok(ret);
536 }
537 ret.set_metadata(HashMap::with_capacity(hash_map_size));
538 if basic_info.has_id() {
539 ret.metadata_mut().insert(
540 PARQUET_FIELD_ID_META_KEY.to_string(),
541 basic_info.id().to_string(),
542 );
543 }
544 try_add_extension_type(ret, parquet_column.self_type())
545}
546
547pub fn decimal_length_from_precision(precision: u8) -> usize {
548 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
556}
557
558fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
560 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
561 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
562 const PARQUET_KEY_FIELD_NAME: &str = "key";
563 const PARQUET_VALUE_FIELD_NAME: &str = "value";
564
565 let name = field.name().as_str();
566 let repetition = if field.is_nullable() {
567 Repetition::OPTIONAL
568 } else {
569 Repetition::REQUIRED
570 };
571 let id = field_id(field);
572 match field.data_type() {
574 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
575 .with_logical_type(Some(LogicalType::Unknown))
576 .with_repetition(repetition)
577 .with_id(id)
578 .build(),
579 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
580 .with_repetition(repetition)
581 .with_id(id)
582 .build(),
583 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
584 .with_logical_type(Some(LogicalType::integer(8, true)))
585 .with_repetition(repetition)
586 .with_id(id)
587 .build(),
588 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
589 .with_logical_type(Some(LogicalType::integer(16, true)))
590 .with_repetition(repetition)
591 .with_id(id)
592 .build(),
593 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
594 .with_repetition(repetition)
595 .with_id(id)
596 .build(),
597 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
598 .with_repetition(repetition)
599 .with_id(id)
600 .build(),
601 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
602 .with_logical_type(Some(LogicalType::integer(8, false)))
603 .with_repetition(repetition)
604 .with_id(id)
605 .build(),
606 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
607 .with_logical_type(Some(LogicalType::integer(16, false)))
608 .with_repetition(repetition)
609 .with_id(id)
610 .build(),
611 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
612 .with_logical_type(Some(LogicalType::integer(32, false)))
613 .with_repetition(repetition)
614 .with_id(id)
615 .build(),
616 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
617 .with_logical_type(Some(LogicalType::integer(64, false)))
618 .with_repetition(repetition)
619 .with_id(id)
620 .build(),
621 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
622 .with_repetition(repetition)
623 .with_id(id)
624 .with_logical_type(Some(LogicalType::Float16))
625 .with_length(2)
626 .build(),
627 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
628 .with_repetition(repetition)
629 .with_id(id)
630 .build(),
631 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
632 .with_repetition(repetition)
633 .with_id(id)
634 .build(),
635 DataType::Timestamp(TimeUnit::Second, _) => {
636 Type::primitive_type_builder(name, PhysicalType::INT64)
638 .with_repetition(repetition)
639 .with_id(id)
640 .build()
641 }
642 DataType::Timestamp(time_unit, tz) => {
643 Type::primitive_type_builder(name, PhysicalType::INT64)
644 .with_logical_type(Some(LogicalType::timestamp(
645 matches!(tz, Some(z) if !z.as_ref().is_empty()),
647 match time_unit {
648 TimeUnit::Second => unreachable!(),
649 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
650 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
651 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
652 },
653 )))
654 .with_repetition(repetition)
655 .with_id(id)
656 .build()
657 }
658 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
659 .with_logical_type(Some(LogicalType::Date))
660 .with_repetition(repetition)
661 .with_id(id)
662 .build(),
663 DataType::Date64 => {
664 if coerce_types {
665 Type::primitive_type_builder(name, PhysicalType::INT32)
666 .with_logical_type(Some(LogicalType::Date))
667 .with_repetition(repetition)
668 .with_id(id)
669 .build()
670 } else {
671 Type::primitive_type_builder(name, PhysicalType::INT64)
672 .with_repetition(repetition)
673 .with_id(id)
674 .build()
675 }
676 }
677 DataType::Time32(TimeUnit::Second) => {
678 Type::primitive_type_builder(name, PhysicalType::INT32)
680 .with_repetition(repetition)
681 .with_id(id)
682 .build()
683 }
684 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
685 .with_logical_type(Some(LogicalType::time(
686 field.metadata().contains_key("adjusted_to_utc"),
687 match unit {
688 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
689 u => unreachable!("Invalid unit for Time32: {:?}", u),
690 },
691 )))
692 .with_repetition(repetition)
693 .with_id(id)
694 .build(),
695 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
696 .with_logical_type(Some(LogicalType::time(
697 field.metadata().contains_key("adjusted_to_utc"),
698 match unit {
699 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
700 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
701 u => unreachable!("Invalid unit for Time64: {:?}", u),
702 },
703 )))
704 .with_repetition(repetition)
705 .with_id(id)
706 .build(),
707 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
708 .with_repetition(repetition)
709 .with_id(id)
710 .build(),
711 DataType::Interval(_) => {
712 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
713 .with_converted_type(ConvertedType::INTERVAL)
714 .with_repetition(repetition)
715 .with_id(id)
716 .with_length(12)
717 .build()
718 }
719 DataType::Binary | DataType::LargeBinary => {
720 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
721 .with_repetition(repetition)
722 .with_id(id)
723 .with_logical_type(logical_type_for_binary(field))
724 .build()
725 }
726 DataType::FixedSizeBinary(length) => {
727 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
728 .with_repetition(repetition)
729 .with_id(id)
730 .with_length(*length)
731 .with_logical_type(logical_type_for_fixed_size_binary(field))
732 .build()
733 }
734 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
735 .with_repetition(repetition)
736 .with_id(id)
737 .with_logical_type(logical_type_for_binary_view(field))
738 .build(),
739 DataType::Decimal32(precision, scale)
740 | DataType::Decimal64(precision, scale)
741 | DataType::Decimal128(precision, scale)
742 | DataType::Decimal256(precision, scale) => {
743 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
746 (PhysicalType::INT32, -1)
747 } else if *precision <= 18 {
748 (PhysicalType::INT64, -1)
749 } else {
750 (
751 PhysicalType::FIXED_LEN_BYTE_ARRAY,
752 decimal_length_from_precision(*precision) as i32,
753 )
754 };
755 Type::primitive_type_builder(name, physical_type)
756 .with_repetition(repetition)
757 .with_id(id)
758 .with_length(length)
759 .with_logical_type(Some(LogicalType::decimal(*scale as i32, *precision as i32)))
760 .with_precision(*precision as i32)
761 .with_scale(*scale as i32)
762 .build()
763 }
764 DataType::Utf8 | DataType::LargeUtf8 => {
765 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
766 .with_logical_type(logical_type_for_string(field))
767 .with_repetition(repetition)
768 .with_id(id)
769 .build()
770 }
771 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
772 .with_logical_type(logical_type_for_string(field))
773 .with_repetition(repetition)
774 .with_id(id)
775 .build(),
776 DataType::List(f)
777 | DataType::FixedSizeList(f, _)
778 | DataType::LargeList(f)
779 | DataType::ListView(f)
780 | DataType::LargeListView(f) => {
781 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
782 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
784 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
785 } else {
786 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
787 };
788
789 Type::group_type_builder(name)
790 .with_fields(vec![Arc::new(
791 Type::group_type_builder("list")
792 .with_fields(vec![field_ref])
793 .with_repetition(Repetition::REPEATED)
794 .build()?,
795 )])
796 .with_logical_type(Some(LogicalType::List))
797 .with_repetition(repetition)
798 .with_id(id)
799 .build()
800 }
801 DataType::Struct(fields) => {
802 if fields.is_empty() {
803 return Err(arrow_err!("Parquet does not support writing empty structs",));
804 }
805 let fields = fields
807 .iter()
808 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
809 .collect::<Result<_>>()?;
810 Type::group_type_builder(name)
811 .with_fields(fields)
812 .with_repetition(repetition)
813 .with_id(id)
814 .with_logical_type(logical_type_for_struct(field))
815 .build()
816 }
817 DataType::Map(field, _) => {
818 if let DataType::Struct(struct_fields) = field.data_type() {
819 let map_struct_name = if coerce_types {
821 PARQUET_MAP_STRUCT_NAME
822 } else {
823 field.name()
824 };
825
826 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
828 if coerce_types && fld.name() != name {
829 let f = fld.as_ref().clone().with_name(name);
830 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
831 } else {
832 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
833 }
834 };
835 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
836 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
837
838 Type::group_type_builder(name)
839 .with_fields(vec![Arc::new(
840 Type::group_type_builder(map_struct_name)
841 .with_fields(vec![key_field, val_field])
842 .with_repetition(Repetition::REPEATED)
843 .build()?,
844 )])
845 .with_logical_type(Some(LogicalType::Map))
846 .with_repetition(repetition)
847 .with_id(id)
848 .build()
849 } else {
850 Err(arrow_err!(
851 "DataType::Map should contain a struct field child",
852 ))
853 }
854 }
855 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
856 DataType::Dictionary(_, value) => {
857 let dict_field = field.clone().with_data_type(value.as_ref().clone());
859 arrow_to_parquet_type(&dict_field, coerce_types)
860 }
861 DataType::RunEndEncoded(_, value_field) => {
862 let ree_value_field = field
863 .clone()
864 .with_data_type(value_field.data_type().clone());
865 arrow_to_parquet_type(&ree_value_field, coerce_types)
866 }
867 }
868}
869
870fn field_id(field: &Field) -> Option<i32> {
871 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
872 value.parse().ok() }
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 use std::{collections::HashMap, sync::Arc};
880
881 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
882 use crate::file::metadata::KeyValue;
883 use crate::file::reader::FileReader;
884 use crate::{
885 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
886 schema::{parser::parse_message_type, types::SchemaDescriptor},
887 };
888 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
889
890 #[test]
891 fn test_flat_primitives() {
892 let message_type = "
893 message test_schema {
894 REQUIRED BOOLEAN boolean;
895 REQUIRED INT32 int8 (INT_8);
896 REQUIRED INT32 int16 (INT_16);
897 REQUIRED INT32 uint8 (INTEGER(8,false));
898 REQUIRED INT32 uint16 (INTEGER(16,false));
899 REQUIRED INT32 int32;
900 REQUIRED INT64 int64;
901 OPTIONAL DOUBLE double;
902 OPTIONAL FLOAT float;
903 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
904 OPTIONAL BINARY string (UTF8);
905 OPTIONAL BINARY string_2 (STRING);
906 OPTIONAL BINARY json (JSON);
907 }
908 ";
909 let parquet_group_type = parse_message_type(message_type).unwrap();
910
911 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
912 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
913
914 let arrow_fields = Fields::from(vec![
915 Field::new("boolean", DataType::Boolean, false),
916 Field::new("int8", DataType::Int8, false),
917 Field::new("int16", DataType::Int16, false),
918 Field::new("uint8", DataType::UInt8, false),
919 Field::new("uint16", DataType::UInt16, false),
920 Field::new("int32", DataType::Int32, false),
921 Field::new("int64", DataType::Int64, false),
922 Field::new("double", DataType::Float64, true),
923 Field::new("float", DataType::Float32, true),
924 Field::new("float16", DataType::Float16, true),
925 Field::new("string", DataType::Utf8, true),
926 Field::new("string_2", DataType::Utf8, true),
927 json_field(),
928 ]);
929
930 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
931 }
932
933 fn json_field() -> Field {
936 #[cfg(feature = "arrow_canonical_extension_types")]
937 {
938 Field::new("json", DataType::Utf8, true)
939 .with_extension_type(arrow_schema::extension::Json::default())
940 }
941 #[cfg(not(feature = "arrow_canonical_extension_types"))]
942 {
943 Field::new("json", DataType::Utf8, true)
944 }
945 }
946
947 #[test]
948 fn test_decimal_fields() {
949 let message_type = "
950 message test_schema {
951 REQUIRED INT32 decimal1 (DECIMAL(4,2));
952 REQUIRED INT64 decimal2 (DECIMAL(12,2));
953 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
954 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
955 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
956 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
957 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
958 }
959 ";
960
961 let parquet_group_type = parse_message_type(message_type).unwrap();
962
963 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
964 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
965
966 let arrow_fields = Fields::from(vec![
967 Field::new("decimal1", DataType::Decimal128(4, 2), false),
968 Field::new("decimal2", DataType::Decimal128(12, 2), false),
969 Field::new("decimal3", DataType::Decimal128(30, 2), false),
970 Field::new("decimal4", DataType::Decimal128(33, 2), false),
971 Field::new("decimal5", DataType::Decimal128(38, 2), false),
972 Field::new("decimal6", DataType::Decimal256(39, 2), false),
973 Field::new("decimal7", DataType::Decimal256(39, 2), false),
974 ]);
975 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
976 }
977
978 #[test]
979 fn test_byte_array_fields() {
980 let message_type = "
981 message test_schema {
982 REQUIRED BYTE_ARRAY binary;
983 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
984 }
985 ";
986
987 let parquet_group_type = parse_message_type(message_type).unwrap();
988
989 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
990 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
991
992 let arrow_fields = Fields::from(vec![
993 Field::new("binary", DataType::Binary, false),
994 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
995 ]);
996 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
997 }
998
999 #[test]
1000 fn test_duplicate_fields() {
1001 let message_type = "
1002 message test_schema {
1003 REQUIRED BOOLEAN boolean;
1004 REQUIRED INT32 int8 (INT_8);
1005 }
1006 ";
1007
1008 let parquet_group_type = parse_message_type(message_type).unwrap();
1009
1010 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1011 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1012
1013 let arrow_fields = Fields::from(vec![
1014 Field::new("boolean", DataType::Boolean, false),
1015 Field::new("int8", DataType::Int8, false),
1016 ]);
1017 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1018
1019 let converted_arrow_schema =
1020 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1021 .unwrap();
1022 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1023 }
1024
1025 #[test]
1026 fn test_parquet_lists() {
1027 let mut arrow_fields = Vec::new();
1028
1029 let message_type = "
1031 message test_schema {
1032 REQUIRED GROUP my_list (LIST) {
1033 REPEATED GROUP list {
1034 OPTIONAL BINARY element (UTF8);
1035 }
1036 }
1037 OPTIONAL GROUP my_list (LIST) {
1038 REPEATED GROUP list {
1039 REQUIRED BINARY element (UTF8);
1040 }
1041 }
1042 OPTIONAL GROUP array_of_arrays (LIST) {
1043 REPEATED GROUP list {
1044 REQUIRED GROUP element (LIST) {
1045 REPEATED GROUP list {
1046 REQUIRED INT32 element;
1047 }
1048 }
1049 }
1050 }
1051 OPTIONAL GROUP my_list (LIST) {
1052 REPEATED GROUP element {
1053 REQUIRED BINARY str (UTF8);
1054 }
1055 }
1056 OPTIONAL GROUP my_list (LIST) {
1057 REPEATED INT32 element;
1058 }
1059 OPTIONAL GROUP my_list (LIST) {
1060 REPEATED GROUP element {
1061 REQUIRED BINARY str (UTF8);
1062 REQUIRED INT32 num;
1063 }
1064 }
1065 OPTIONAL GROUP my_list (LIST) {
1066 REPEATED GROUP array {
1067 REQUIRED BINARY str (UTF8);
1068 }
1069
1070 }
1071 OPTIONAL GROUP my_list (LIST) {
1072 REPEATED GROUP my_list_tuple {
1073 REQUIRED BINARY str (UTF8);
1074 }
1075 }
1076 REPEATED INT32 name;
1077 }
1078 ";
1079
1080 {
1087 arrow_fields.push(Field::new_list(
1088 "my_list",
1089 Field::new("element", DataType::Utf8, true),
1090 false,
1091 ));
1092 }
1093
1094 {
1101 arrow_fields.push(Field::new_list(
1102 "my_list",
1103 Field::new("element", DataType::Utf8, false),
1104 true,
1105 ));
1106 }
1107
1108 {
1121 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1122 arrow_fields.push(Field::new_list(
1123 "array_of_arrays",
1124 Field::new_list("element", arrow_inner_list, false),
1125 true,
1126 ));
1127 }
1128
1129 {
1136 arrow_fields.push(Field::new_list(
1137 "my_list",
1138 Field::new("str", DataType::Utf8, false),
1139 true,
1140 ));
1141 }
1142
1143 {
1148 arrow_fields.push(Field::new_list(
1149 "my_list",
1150 Field::new("element", DataType::Int32, false),
1151 true,
1152 ));
1153 }
1154
1155 {
1163 let fields = vec![
1164 Field::new("str", DataType::Utf8, false),
1165 Field::new("num", DataType::Int32, false),
1166 ];
1167 arrow_fields.push(Field::new_list(
1168 "my_list",
1169 Field::new_struct("element", fields, false),
1170 true,
1171 ));
1172 }
1173
1174 {
1182 let fields = vec![Field::new("str", DataType::Utf8, false)];
1183 arrow_fields.push(Field::new_list(
1184 "my_list",
1185 Field::new_struct("array", fields, false),
1186 true,
1187 ));
1188 }
1189
1190 {
1198 let fields = vec![Field::new("str", DataType::Utf8, false)];
1199 arrow_fields.push(Field::new_list(
1200 "my_list",
1201 Field::new_struct("my_list_tuple", fields, false),
1202 true,
1203 ));
1204 }
1205
1206 {
1209 arrow_fields.push(Field::new_list(
1210 "name",
1211 Field::new("name", DataType::Int32, false),
1212 false,
1213 ));
1214 }
1215
1216 let parquet_group_type = parse_message_type(message_type).unwrap();
1217
1218 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1219 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1220 let converted_fields = converted_arrow_schema.fields();
1221
1222 assert_eq!(arrow_fields.len(), converted_fields.len());
1223 for i in 0..arrow_fields.len() {
1224 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1225 }
1226 }
1227
1228 #[test]
1229 fn test_parquet_list_nullable() {
1230 let mut arrow_fields = Vec::new();
1231
1232 let message_type = "
1233 message test_schema {
1234 REQUIRED GROUP my_list1 (LIST) {
1235 REPEATED GROUP list {
1236 OPTIONAL BINARY element (UTF8);
1237 }
1238 }
1239 OPTIONAL GROUP my_list2 (LIST) {
1240 REPEATED GROUP list {
1241 REQUIRED BINARY element (UTF8);
1242 }
1243 }
1244 REQUIRED GROUP my_list3 (LIST) {
1245 REPEATED GROUP list {
1246 REQUIRED BINARY element (UTF8);
1247 }
1248 }
1249 }
1250 ";
1251
1252 {
1259 arrow_fields.push(Field::new_list(
1260 "my_list1",
1261 Field::new("element", DataType::Utf8, true),
1262 false,
1263 ));
1264 }
1265
1266 {
1273 arrow_fields.push(Field::new_list(
1274 "my_list2",
1275 Field::new("element", DataType::Utf8, false),
1276 true,
1277 ));
1278 }
1279
1280 {
1287 arrow_fields.push(Field::new_list(
1288 "my_list3",
1289 Field::new("element", DataType::Utf8, false),
1290 false,
1291 ));
1292 }
1293
1294 let parquet_group_type = parse_message_type(message_type).unwrap();
1295
1296 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1297 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1298 let converted_fields = converted_arrow_schema.fields();
1299
1300 assert_eq!(arrow_fields.len(), converted_fields.len());
1301 for i in 0..arrow_fields.len() {
1302 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1303 }
1304 }
1305
1306 #[test]
1307 fn test_parquet_maps() {
1308 let mut arrow_fields = Vec::new();
1309
1310 let message_type = "
1312 message test_schema {
1313 REQUIRED group my_map1 (MAP) {
1314 REPEATED group key_value {
1315 REQUIRED binary key (UTF8);
1316 OPTIONAL int32 value;
1317 }
1318 }
1319 OPTIONAL group my_map2 (MAP) {
1320 REPEATED group map {
1321 REQUIRED binary str (UTF8);
1322 REQUIRED int32 num;
1323 }
1324 }
1325 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1326 REPEATED group map {
1327 REQUIRED binary key (UTF8);
1328 OPTIONAL int32 value;
1329 }
1330 }
1331 REQUIRED group my_map4 (MAP) {
1332 REPEATED group map {
1333 OPTIONAL binary key (UTF8);
1334 REQUIRED int32 value;
1335 }
1336 }
1337 }
1338 ";
1339
1340 {
1348 arrow_fields.push(Field::new_map(
1349 "my_map1",
1350 "key_value",
1351 Field::new("key", DataType::Utf8, false),
1352 Field::new("value", DataType::Int32, true),
1353 false,
1354 false,
1355 ));
1356 }
1357
1358 {
1366 arrow_fields.push(Field::new_map(
1367 "my_map2",
1368 "map",
1369 Field::new("str", DataType::Utf8, false),
1370 Field::new("num", DataType::Int32, false),
1371 false,
1372 true,
1373 ));
1374 }
1375
1376 {
1384 arrow_fields.push(Field::new_map(
1385 "my_map3",
1386 "map",
1387 Field::new("key", DataType::Utf8, false),
1388 Field::new("value", DataType::Int32, true),
1389 false,
1390 true,
1391 ));
1392 }
1393
1394 {
1402 arrow_fields.push(Field::new_map(
1403 "my_map4",
1404 "map",
1405 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1407 false,
1408 false,
1409 ));
1410 }
1411
1412 let parquet_group_type = parse_message_type(message_type).unwrap();
1413
1414 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1415 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1416 let converted_fields = converted_arrow_schema.fields();
1417
1418 assert_eq!(arrow_fields.len(), converted_fields.len());
1419 for i in 0..arrow_fields.len() {
1420 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1421 }
1422 }
1423
1424 #[test]
1425 fn test_nested_schema() {
1426 let mut arrow_fields = Vec::new();
1427 {
1428 let group1_fields = Fields::from(vec![
1429 Field::new("leaf1", DataType::Boolean, false),
1430 Field::new("leaf2", DataType::Int32, false),
1431 ]);
1432 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1433 arrow_fields.push(group1_struct);
1434
1435 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1436 arrow_fields.push(leaf3_field);
1437 }
1438
1439 let message_type = "
1440 message test_schema {
1441 REQUIRED GROUP group1 {
1442 REQUIRED BOOLEAN leaf1;
1443 REQUIRED INT32 leaf2;
1444 }
1445 REQUIRED INT64 leaf3;
1446 }
1447 ";
1448 let parquet_group_type = parse_message_type(message_type).unwrap();
1449
1450 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1451 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1452 let converted_fields = converted_arrow_schema.fields();
1453
1454 assert_eq!(arrow_fields.len(), converted_fields.len());
1455 for i in 0..arrow_fields.len() {
1456 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1457 }
1458 }
1459
1460 #[test]
1461 fn test_nested_schema_partial() {
1462 let mut arrow_fields = Vec::new();
1463 {
1464 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1465 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1466 arrow_fields.push(group1);
1467
1468 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1469 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1470 arrow_fields.push(group2);
1471
1472 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1473 }
1474
1475 let message_type = "
1476 message test_schema {
1477 REQUIRED GROUP group1 {
1478 REQUIRED INT64 leaf1;
1479 REQUIRED INT64 leaf2;
1480 }
1481 REQUIRED GROUP group2 {
1482 REQUIRED INT64 leaf3;
1483 REQUIRED INT64 leaf4;
1484 }
1485 REQUIRED INT64 leaf5;
1486 }
1487 ";
1488 let parquet_group_type = parse_message_type(message_type).unwrap();
1489
1490 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1500 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1501 let converted_arrow_schema =
1502 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1503 let converted_fields = converted_arrow_schema.fields();
1504
1505 assert_eq!(arrow_fields.len(), converted_fields.len());
1506 for i in 0..arrow_fields.len() {
1507 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1508 }
1509 }
1510
1511 #[test]
1512 fn test_nested_schema_partial_ordering() {
1513 let mut arrow_fields = Vec::new();
1514 {
1515 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1516 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1517 arrow_fields.push(group1);
1518
1519 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1520 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1521 arrow_fields.push(group2);
1522
1523 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1524 }
1525
1526 let message_type = "
1527 message test_schema {
1528 REQUIRED GROUP group1 {
1529 REQUIRED INT64 leaf1;
1530 REQUIRED INT64 leaf2;
1531 }
1532 REQUIRED GROUP group2 {
1533 REQUIRED INT64 leaf3;
1534 REQUIRED INT64 leaf4;
1535 }
1536 REQUIRED INT64 leaf5;
1537 }
1538 ";
1539 let parquet_group_type = parse_message_type(message_type).unwrap();
1540
1541 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1551 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1552 let converted_arrow_schema =
1553 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1554 let converted_fields = converted_arrow_schema.fields();
1555
1556 assert_eq!(arrow_fields.len(), converted_fields.len());
1557 for i in 0..arrow_fields.len() {
1558 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1559 }
1560
1561 let mask =
1562 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1563 let converted_arrow_schema =
1564 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1565 let converted_fields = converted_arrow_schema.fields();
1566
1567 assert_eq!(arrow_fields.len(), converted_fields.len());
1568 for i in 0..arrow_fields.len() {
1569 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1570 }
1571 }
1572
1573 #[test]
1574 fn test_repeated_nested_schema() {
1575 let mut arrow_fields = Vec::new();
1576 {
1577 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1578
1579 let inner_group_list = Field::new_list(
1580 "innerGroup",
1581 Field::new_struct(
1582 "innerGroup",
1583 vec![Field::new("leaf3", DataType::Int32, true)],
1584 false,
1585 ),
1586 false,
1587 );
1588
1589 let outer_group_list = Field::new_list(
1590 "outerGroup",
1591 Field::new_struct(
1592 "outerGroup",
1593 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1594 false,
1595 ),
1596 false,
1597 );
1598 arrow_fields.push(outer_group_list);
1599 }
1600
1601 let message_type = "
1602 message test_schema {
1603 OPTIONAL INT32 leaf1;
1604 REPEATED GROUP outerGroup {
1605 OPTIONAL INT32 leaf2;
1606 REPEATED GROUP innerGroup {
1607 OPTIONAL INT32 leaf3;
1608 }
1609 }
1610 }
1611 ";
1612 let parquet_group_type = parse_message_type(message_type).unwrap();
1613
1614 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1615 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1616 let converted_fields = converted_arrow_schema.fields();
1617
1618 assert_eq!(arrow_fields.len(), converted_fields.len());
1619 for i in 0..arrow_fields.len() {
1620 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1621 }
1622 }
1623
1624 #[test]
1625 fn test_column_desc_to_field() {
1626 let message_type = "
1627 message test_schema {
1628 REQUIRED BOOLEAN boolean;
1629 REQUIRED INT32 int8 (INT_8);
1630 REQUIRED INT32 uint8 (INTEGER(8,false));
1631 REQUIRED INT32 int16 (INT_16);
1632 REQUIRED INT32 uint16 (INTEGER(16,false));
1633 REQUIRED INT32 int32;
1634 REQUIRED INT64 int64;
1635 OPTIONAL DOUBLE double;
1636 OPTIONAL FLOAT float;
1637 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1638 OPTIONAL BINARY string (UTF8);
1639 REPEATED BOOLEAN bools;
1640 OPTIONAL INT32 date (DATE);
1641 OPTIONAL INT32 time_milli (TIME_MILLIS);
1642 OPTIONAL INT64 time_micro (TIME_MICROS);
1643 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1644 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1645 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1646 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1647 REPEATED INT32 int_list;
1648 REPEATED BINARY byte_list;
1649 REPEATED BINARY string_list (UTF8);
1650 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1651 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1652 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1653 }
1654 ";
1655 let parquet_group_type = parse_message_type(message_type).unwrap();
1656
1657 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1658 let converted_arrow_fields = parquet_schema
1659 .columns()
1660 .iter()
1661 .map(|c| parquet_to_arrow_field(c).unwrap())
1662 .collect::<Vec<Field>>();
1663
1664 let arrow_fields = vec![
1665 Field::new("boolean", DataType::Boolean, false),
1666 Field::new("int8", DataType::Int8, false),
1667 Field::new("uint8", DataType::UInt8, false),
1668 Field::new("int16", DataType::Int16, false),
1669 Field::new("uint16", DataType::UInt16, false),
1670 Field::new("int32", DataType::Int32, false),
1671 Field::new("int64", DataType::Int64, false),
1672 Field::new("double", DataType::Float64, true),
1673 Field::new("float", DataType::Float32, true),
1674 Field::new("float16", DataType::Float16, true),
1675 Field::new("string", DataType::Utf8, true),
1676 Field::new_list(
1677 "bools",
1678 Field::new("bools", DataType::Boolean, false),
1679 false,
1680 ),
1681 Field::new("date", DataType::Date32, true),
1682 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1683 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1684 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1685 Field::new(
1686 "ts_milli",
1687 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1688 true,
1689 ),
1690 Field::new(
1691 "ts_micro",
1692 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1693 false,
1694 ),
1695 Field::new(
1696 "ts_nano",
1697 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1698 false,
1699 ),
1700 Field::new_list(
1701 "int_list",
1702 Field::new("int_list", DataType::Int32, false),
1703 false,
1704 ),
1705 Field::new_list(
1706 "byte_list",
1707 Field::new("byte_list", DataType::Binary, false),
1708 false,
1709 ),
1710 Field::new_list(
1711 "string_list",
1712 Field::new("string_list", DataType::Utf8, false),
1713 false,
1714 ),
1715 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1716 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1717 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1718 ];
1719
1720 assert_eq!(arrow_fields, converted_arrow_fields);
1721 }
1722
1723 #[test]
1724 fn test_coerced_map_list() {
1725 let arrow_fields = vec![
1727 Field::new_list(
1728 "my_list",
1729 Field::new("item", DataType::Boolean, true),
1730 false,
1731 ),
1732 Field::new_map(
1733 "my_map",
1734 "entries",
1735 Field::new("keys", DataType::Utf8, false),
1736 Field::new("values", DataType::Int32, true),
1737 false,
1738 true,
1739 ),
1740 ];
1741 let arrow_schema = Schema::new(arrow_fields);
1742
1743 let message_type = "
1745 message parquet_schema {
1746 REQUIRED GROUP my_list (LIST) {
1747 REPEATED GROUP list {
1748 OPTIONAL BOOLEAN element;
1749 }
1750 }
1751 OPTIONAL GROUP my_map (MAP) {
1752 REPEATED GROUP key_value {
1753 REQUIRED BINARY key (STRING);
1754 OPTIONAL INT32 value;
1755 }
1756 }
1757 }
1758 ";
1759 let parquet_group_type = parse_message_type(message_type).unwrap();
1760 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1761 let converted_arrow_schema = ArrowSchemaConverter::new()
1762 .with_coerce_types(true)
1763 .convert(&arrow_schema)
1764 .unwrap();
1765 assert_eq!(
1766 parquet_schema.columns().len(),
1767 converted_arrow_schema.columns().len()
1768 );
1769
1770 let message_type = "
1772 message parquet_schema {
1773 REQUIRED GROUP my_list (LIST) {
1774 REPEATED GROUP list {
1775 OPTIONAL BOOLEAN item;
1776 }
1777 }
1778 OPTIONAL GROUP my_map (MAP) {
1779 REPEATED GROUP entries {
1780 REQUIRED BINARY keys (STRING);
1781 OPTIONAL INT32 values;
1782 }
1783 }
1784 }
1785 ";
1786 let parquet_group_type = parse_message_type(message_type).unwrap();
1787 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1788 let converted_arrow_schema = ArrowSchemaConverter::new()
1789 .with_coerce_types(false)
1790 .convert(&arrow_schema)
1791 .unwrap();
1792 assert_eq!(
1793 parquet_schema.columns().len(),
1794 converted_arrow_schema.columns().len()
1795 );
1796 }
1797
1798 #[test]
1799 fn test_field_to_column_desc() {
1800 let message_type = "
1801 message arrow_schema {
1802 REQUIRED BOOLEAN boolean;
1803 REQUIRED INT32 int8 (INT_8);
1804 REQUIRED INT32 int16 (INTEGER(16,true));
1805 REQUIRED INT32 int32;
1806 REQUIRED INT64 int64;
1807 OPTIONAL DOUBLE double;
1808 OPTIONAL FLOAT float;
1809 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1810 OPTIONAL BINARY string (STRING);
1811 OPTIONAL GROUP bools (LIST) {
1812 REPEATED GROUP list {
1813 OPTIONAL BOOLEAN element;
1814 }
1815 }
1816 REQUIRED GROUP bools_non_null (LIST) {
1817 REPEATED GROUP list {
1818 REQUIRED BOOLEAN element;
1819 }
1820 }
1821 OPTIONAL INT32 date (DATE);
1822 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1823 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1824 OPTIONAL INT64 time_micro (TIME_MICROS);
1825 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1826 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1827 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1828 REQUIRED INT64 ts_seconds;
1829 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1830 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1831 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1832 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1833 REQUIRED GROUP struct {
1834 REQUIRED BOOLEAN bools;
1835 REQUIRED INT32 uint32 (INTEGER(32,false));
1836 REQUIRED GROUP int32 (LIST) {
1837 REPEATED GROUP list {
1838 OPTIONAL INT32 element;
1839 }
1840 }
1841 }
1842 REQUIRED BINARY dictionary_strings (STRING);
1843 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1844 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1845 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1846 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1847 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1848 }
1849 ";
1850 let parquet_group_type = parse_message_type(message_type).unwrap();
1851
1852 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1853
1854 let arrow_fields = vec![
1855 Field::new("boolean", DataType::Boolean, false),
1856 Field::new("int8", DataType::Int8, false),
1857 Field::new("int16", DataType::Int16, false),
1858 Field::new("int32", DataType::Int32, false),
1859 Field::new("int64", DataType::Int64, false),
1860 Field::new("double", DataType::Float64, true),
1861 Field::new("float", DataType::Float32, true),
1862 Field::new("float16", DataType::Float16, true),
1863 Field::new("string", DataType::Utf8, true),
1864 Field::new_list(
1865 "bools",
1866 Field::new("element", DataType::Boolean, true),
1867 true,
1868 ),
1869 Field::new_list(
1870 "bools_non_null",
1871 Field::new("element", DataType::Boolean, false),
1872 false,
1873 ),
1874 Field::new("date", DataType::Date32, true),
1875 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1876 Field::new(
1877 "time_milli_utc",
1878 DataType::Time32(TimeUnit::Millisecond),
1879 true,
1880 )
1881 .with_metadata(HashMap::from_iter(vec![(
1882 "adjusted_to_utc".to_string(),
1883 "".to_string(),
1884 )])),
1885 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1886 Field::new(
1887 "time_micro_utc",
1888 DataType::Time64(TimeUnit::Microsecond),
1889 true,
1890 )
1891 .with_metadata(HashMap::from_iter(vec![(
1892 "adjusted_to_utc".to_string(),
1893 "".to_string(),
1894 )])),
1895 Field::new(
1896 "ts_milli",
1897 DataType::Timestamp(TimeUnit::Millisecond, None),
1898 true,
1899 ),
1900 Field::new(
1901 "ts_micro",
1902 DataType::Timestamp(TimeUnit::Microsecond, None),
1903 false,
1904 ),
1905 Field::new(
1906 "ts_seconds",
1907 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1908 false,
1909 ),
1910 Field::new(
1911 "ts_micro_utc",
1912 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1913 false,
1914 ),
1915 Field::new(
1916 "ts_millis_zero_offset",
1917 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1918 false,
1919 ),
1920 Field::new(
1921 "ts_millis_zero_negative_offset",
1922 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1923 false,
1924 ),
1925 Field::new(
1926 "ts_micro_non_utc",
1927 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1928 false,
1929 ),
1930 Field::new_struct(
1931 "struct",
1932 vec![
1933 Field::new("bools", DataType::Boolean, false),
1934 Field::new("uint32", DataType::UInt32, false),
1935 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1936 ],
1937 false,
1938 ),
1939 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1940 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1941 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1942 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1943 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1944 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1945 ];
1946 let arrow_schema = Schema::new(arrow_fields);
1947 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1948
1949 assert_eq!(
1950 parquet_schema.columns().len(),
1951 converted_arrow_schema.columns().len()
1952 );
1953 parquet_schema
1954 .columns()
1955 .iter()
1956 .zip(converted_arrow_schema.columns())
1957 .for_each(|(a, b)| {
1958 match a.logical_type_ref() {
1963 Some(_) => {
1964 assert_eq!(a, b)
1965 }
1966 None => {
1967 assert_eq!(a.name(), b.name());
1968 assert_eq!(a.physical_type(), b.physical_type());
1969 assert_eq!(a.converted_type(), b.converted_type());
1970 }
1971 };
1972 });
1973 }
1974
1975 #[test]
1976 #[should_panic(expected = "Parquet does not support writing empty structs")]
1977 fn test_empty_struct_field() {
1978 let arrow_fields = vec![Field::new(
1979 "struct",
1980 DataType::Struct(Fields::empty()),
1981 false,
1982 )];
1983 let arrow_schema = Schema::new(arrow_fields);
1984 let converted_arrow_schema = ArrowSchemaConverter::new()
1985 .with_coerce_types(true)
1986 .convert(&arrow_schema);
1987
1988 converted_arrow_schema.unwrap();
1989 }
1990
1991 #[test]
1992 fn test_metadata() {
1993 let message_type = "
1994 message test_schema {
1995 OPTIONAL BINARY string (STRING);
1996 }
1997 ";
1998 let parquet_group_type = parse_message_type(message_type).unwrap();
1999
2000 let key_value_metadata = vec![
2001 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
2002 KeyValue::new("baz".to_owned(), None),
2003 ];
2004
2005 let mut expected_metadata: HashMap<String, String> = HashMap::new();
2006 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
2007
2008 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2009 let converted_arrow_schema =
2010 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2011
2012 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2013 }
2014
2015 #[test]
2016 fn test_arrow_schema_roundtrip() -> Result<()> {
2017 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2018 a.iter()
2019 .map(|(a, b)| (a.to_string(), b.to_string()))
2020 .collect()
2021 };
2022
2023 let schema = Schema::new_with_metadata(
2024 vec![
2025 Field::new("c1", DataType::Utf8, false)
2026 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2027 Field::new("c2", DataType::Binary, false),
2028 Field::new("c3", DataType::FixedSizeBinary(3), false),
2029 Field::new("c4", DataType::Boolean, false),
2030 Field::new("c5", DataType::Date32, false),
2031 Field::new("c6", DataType::Date64, false),
2032 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2033 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2034 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2035 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2036 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2037 Field::new(
2038 "c16",
2039 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2040 false,
2041 ),
2042 Field::new(
2043 "c17",
2044 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2045 false,
2046 ),
2047 Field::new(
2048 "c18",
2049 DataType::Timestamp(TimeUnit::Nanosecond, None),
2050 false,
2051 ),
2052 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2053 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2054 Field::new_list(
2055 "c21",
2056 Field::new_list_field(DataType::Boolean, true)
2057 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2058 false,
2059 )
2060 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2061 Field::new(
2062 "c22",
2063 DataType::FixedSizeList(
2064 Arc::new(Field::new_list_field(DataType::Boolean, true)),
2065 5,
2066 ),
2067 false,
2068 ),
2069 Field::new_list(
2070 "c23",
2071 Field::new_large_list(
2072 "inner",
2073 Field::new_list_field(
2074 DataType::Struct(
2075 vec![
2076 Field::new("a", DataType::Int16, true),
2077 Field::new("b", DataType::Float64, false),
2078 Field::new("c", DataType::Float32, false),
2079 Field::new("d", DataType::Float16, false),
2080 ]
2081 .into(),
2082 ),
2083 false,
2084 ),
2085 true,
2086 ),
2087 false,
2088 ),
2089 Field::new(
2090 "c24",
2091 DataType::Struct(Fields::from(vec![
2092 Field::new("a", DataType::Utf8, false),
2093 Field::new("b", DataType::UInt16, false),
2094 ])),
2095 false,
2096 ),
2097 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2098 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2099 #[allow(deprecated)]
2105 Field::new_dict(
2106 "c31",
2107 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2108 true,
2109 123,
2110 true,
2111 )
2112 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2113 Field::new("c32", DataType::LargeBinary, true),
2114 Field::new("c33", DataType::LargeUtf8, true),
2115 Field::new_large_list(
2116 "c34",
2117 Field::new_list(
2118 "inner",
2119 Field::new_list_field(
2120 DataType::Struct(
2121 vec![
2122 Field::new("a", DataType::Int16, true),
2123 Field::new("b", DataType::Float64, true),
2124 ]
2125 .into(),
2126 ),
2127 true,
2128 ),
2129 true,
2130 ),
2131 true,
2132 ),
2133 Field::new("c35", DataType::Null, true),
2134 Field::new("c36", DataType::Decimal128(2, 1), false),
2135 Field::new("c37", DataType::Decimal256(50, 20), false),
2136 Field::new("c38", DataType::Decimal128(18, 12), true),
2137 Field::new_map(
2138 "c39",
2139 "key_value",
2140 Field::new("key", DataType::Utf8, false),
2141 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2142 false, true,
2144 ),
2145 Field::new_map(
2146 "c40",
2147 "my_entries",
2148 Field::new("my_key", DataType::Utf8, false)
2149 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2150 Field::new_list(
2151 "my_value",
2152 Field::new_list_field(DataType::Utf8, true)
2153 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2154 true,
2155 )
2156 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2157 false, true,
2159 )
2160 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2161 Field::new_map(
2162 "c41",
2163 "my_entries",
2164 Field::new("my_key", DataType::Utf8, false),
2165 Field::new_list(
2166 "my_value",
2167 Field::new_list_field(DataType::Utf8, true)
2168 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2169 true,
2170 ),
2171 false, false,
2173 ),
2174 Field::new("c42", DataType::Decimal32(5, 2), false),
2175 Field::new("c43", DataType::Decimal64(18, 12), true),
2176 ],
2177 meta(&[("Key", "Value")]),
2178 );
2179
2180 let file = tempfile::tempfile().unwrap();
2182 let writer =
2183 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2184 writer.close()?;
2185
2186 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2188
2189 let read_schema = arrow_reader.schema();
2191 assert_eq!(&schema, read_schema.as_ref());
2192
2193 let mut stack = Vec::with_capacity(10);
2195 let mut out = Vec::with_capacity(10);
2196
2197 let root = arrow_reader.parquet_schema().root_schema_ptr();
2198 stack.push((root.name().to_string(), root));
2199
2200 while let Some((p, t)) = stack.pop() {
2201 if t.is_group() {
2202 for f in t.get_fields() {
2203 stack.push((format!("{p}.{}", f.name()), f.clone()))
2204 }
2205 }
2206
2207 let info = t.get_basic_info();
2208 if info.has_id() {
2209 out.push(format!("{p} -> {}", info.id()))
2210 }
2211 }
2212 out.sort_unstable();
2213 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2214
2215 assert_eq!(
2216 &out,
2217 &[
2218 "arrow_schema.c1 -> 2",
2219 "arrow_schema.c21 -> 4",
2220 "arrow_schema.c21.list.item -> 5",
2221 "arrow_schema.c31 -> 6",
2222 "arrow_schema.c40 -> 7",
2223 "arrow_schema.c40.my_entries.my_key -> 8",
2224 "arrow_schema.c40.my_entries.my_value -> 9",
2225 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2226 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2227 ]
2228 );
2229
2230 Ok(())
2231 }
2232
2233 #[test]
2234 fn test_read_parquet_field_ids_raw() -> Result<()> {
2235 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2236 a.iter()
2237 .map(|(a, b)| (a.to_string(), b.to_string()))
2238 .collect()
2239 };
2240 let schema = Schema::new_with_metadata(
2241 vec![
2242 Field::new("c1", DataType::Utf8, true)
2243 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2244 Field::new("c2", DataType::Utf8, true)
2245 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2246 ],
2247 HashMap::new(),
2248 );
2249
2250 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2251 let parquet_bytes = writer.into_inner()?;
2252
2253 let reader =
2254 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2255 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2256
2257 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2259
2260 let parq_schema_descr = ArrowSchemaConverter::new()
2261 .with_coerce_types(true)
2262 .convert(&arrow_schema)?;
2263 let parq_fields = parq_schema_descr.root_schema().get_fields();
2264 assert_eq!(parq_fields.len(), 2);
2265 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2266 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2267
2268 Ok(())
2269 }
2270
2271 #[test]
2272 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2273 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2274 .iter()
2275 .cloned()
2276 .collect();
2277
2278 let schema = Schema::new_with_metadata(
2279 vec![
2280 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2281 Field::new(
2282 "c22",
2283 DataType::FixedSizeList(
2284 Arc::new(Field::new("items", DataType::Boolean, false)),
2285 5,
2286 ),
2287 false,
2288 ),
2289 Field::new_list(
2290 "c23",
2291 Field::new_large_list(
2292 "items",
2293 Field::new_struct(
2294 "items",
2295 vec![
2296 Field::new("a", DataType::Int16, true),
2297 Field::new("b", DataType::Float64, false),
2298 ],
2299 true,
2300 ),
2301 true,
2302 ),
2303 true,
2304 ),
2305 ],
2306 metadata,
2307 );
2308
2309 let file = tempfile::tempfile().unwrap();
2311 let writer =
2312 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2313 writer.close()?;
2314
2315 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2317 let read_schema = arrow_reader.schema();
2318 assert_eq!(&schema, read_schema.as_ref());
2319 Ok(())
2320 }
2321
2322 #[test]
2323 fn test_get_arrow_schema_from_metadata() {
2324 assert!(get_arrow_schema_from_metadata("").is_err());
2325 }
2326
2327 #[test]
2328 #[cfg(feature = "arrow_canonical_extension_types")]
2329 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2330 use arrow_schema::extension::Uuid;
2331 let arrow_schema = Schema::new(vec![
2332 Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2333 ]);
2334
2335 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2336
2337 assert_eq!(
2338 parquet_schema.column(0).logical_type_ref(),
2339 Some(&LogicalType::Uuid)
2340 );
2341
2342 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2343 assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2344
2345 Ok(())
2346 }
2347
2348 #[test]
2349 #[cfg(feature = "arrow_canonical_extension_types")]
2350 fn arrow_json_to_parquet_json() -> Result<()> {
2351 use arrow_schema::extension::Json;
2352 let arrow_schema = Schema::new(vec![
2353 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2354 ]);
2355
2356 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2357
2358 assert_eq!(
2359 parquet_schema.column(0).logical_type_ref(),
2360 Some(&LogicalType::Json)
2361 );
2362
2363 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2364 assert_eq!(
2365 arrow_schema.field(0).try_extension_type::<Json>()?,
2366 Json::default()
2367 );
2368
2369 Ok(())
2370 }
2371
2372 #[test]
2373 fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2374 let message_type = "
2375 message test_schema {
2376 REQUIRED INT32 id;
2377 }
2378 ";
2379 let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2380 let descriptor = SchemaDescriptor::new(parquet_schema);
2381
2382 let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2384 let result = parquet_to_arrow_field_levels_with_virtual(
2385 &descriptor,
2386 ProjectionMask::all(),
2387 None,
2388 &[regular_field],
2389 );
2390
2391 assert!(result.is_err());
2392 assert!(
2393 result
2394 .unwrap_err()
2395 .to_string()
2396 .contains("is not a virtual column")
2397 );
2398 }
2399}