1use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43 has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44 logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45 try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49pub fn parquet_to_arrow_schema(
54 parquet_schema: &SchemaDescriptor,
55 key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60pub fn parquet_to_arrow_schema_by_columns(
63 parquet_schema: &SchemaDescriptor,
64 mask: ProjectionMask,
65 key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70pub(crate) fn parquet_to_arrow_schema_and_fields(
76 parquet_schema: &SchemaDescriptor,
77 mask: ProjectionMask,
78 key_value_metadata: Option<&Vec<KeyValue>>,
79 virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82 let maybe_schema = metadata
83 .remove(super::ARROW_SCHEMA_META_KEY)
84 .map(|value| get_arrow_schema_from_metadata(&value))
85 .transpose()?;
86
87 if let Some(arrow_schema) = &maybe_schema {
89 arrow_schema.metadata().iter().for_each(|(k, v)| {
90 metadata.entry(k.clone()).or_insert_with(|| v.clone());
91 });
92 }
93
94 let hint = maybe_schema.as_ref().map(|s| s.fields());
95 let field_levels =
96 parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98 Ok((schema, field_levels.levels))
99}
100
101#[derive(Debug, Clone)]
109pub struct FieldLevels {
110 pub(crate) fields: Fields,
111 pub(crate) levels: Option<ParquetField>,
112}
113
114pub fn parquet_to_arrow_field_levels(
135 schema: &SchemaDescriptor,
136 mask: ProjectionMask,
137 hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139 parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142pub fn parquet_to_arrow_field_levels_with_virtual(
172 schema: &SchemaDescriptor,
173 mask: ProjectionMask,
174 hint: Option<&Fields>,
175 virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177 for field in virtual_columns {
179 if !virtual_type::is_virtual_column(field) {
180 return Err(ParquetError::General(format!(
181 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182 field.name()
183 )));
184 }
185 }
186
187 let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189 Some(field) => field,
190 None if virtual_columns.is_empty() => {
191 return Ok(FieldLevels {
192 fields: Fields::empty(),
193 levels: None,
194 });
195 }
196 None => {
197 ParquetField {
199 rep_level: 0,
200 def_level: 0,
201 nullable: false,
202 arrow_type: DataType::Struct(Fields::empty()),
203 field_type: ParquetFieldType::Group {
204 children: Vec::new(),
205 },
206 }
207 }
208 };
209
210 if !virtual_columns.is_empty() {
212 match &mut parquet_field.field_type {
213 ParquetFieldType::Group { children } => {
214 let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216 unreachable!("Root field must be a struct");
217 };
218
219 let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222 for virtual_column in virtual_columns {
224 assert_eq!(
226 parquet_field.rep_level, 0,
227 "Virtual columns can only be added at rep level 0"
228 );
229 assert_eq!(
230 parquet_field.def_level, 0,
231 "Virtual columns can only be added at def level 0"
232 );
233
234 fields_vec.push(virtual_column.clone());
235 let virtual_parquet_field = complex::convert_virtual_field(
236 virtual_column,
237 parquet_field.rep_level,
238 parquet_field.def_level,
239 )?;
240 children.push(virtual_parquet_field);
241 }
242
243 parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245 }
246 _ => unreachable!("Root field must be a group"),
247 }
248 }
249
250 match &parquet_field.arrow_type {
251 DataType::Struct(fields) => Ok(FieldLevels {
252 fields: fields.clone(),
253 levels: Some(parquet_field),
254 }),
255 _ => unreachable!(),
256 }
257}
258
259fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261 let decoded = BASE64_STANDARD.decode(encoded_meta);
262 match decoded {
263 Ok(bytes) => {
264 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265 &bytes[8..]
266 } else {
267 bytes.as_slice()
268 };
269 match arrow_ipc::root_as_message(slice) {
270 Ok(message) => message
271 .header_as_schema()
272 .map(arrow_ipc::convert::fb_to_schema)
273 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274 Err(err) => {
275 Err(arrow_err!(
277 "Unable to get root as message stored in {}: {:?}",
278 super::ARROW_SCHEMA_META_KEY,
279 err
280 ))
281 }
282 }
283 }
284 Err(err) => {
285 Err(arrow_err!(
287 "Unable to decode the encoded schema stored in {}, {:?}",
288 super::ARROW_SCHEMA_META_KEY,
289 err
290 ))
291 }
292 }
293}
294
295pub fn encode_arrow_schema(schema: &Schema) -> String {
297 let options = writer::IpcWriteOptions::default();
298 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299 let data_gen = writer::IpcDataGenerator::default();
300 let mut serialized_schema =
301 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303 let schema_len = serialized_schema.ipc_message.len();
306 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309 len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311 BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
321 let encoded = encode_arrow_schema(schema);
322
323 let schema_kv = KeyValue {
324 key: super::ARROW_SCHEMA_META_KEY.to_string(),
325 value: Some(encoded),
326 };
327
328 let meta = props
329 .key_value_metadata
330 .get_or_insert_with(Default::default);
331
332 let schema_meta = meta
334 .iter()
335 .enumerate()
336 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
337 match schema_meta {
338 Some((i, _)) => {
339 meta.remove(i);
340 meta.push(schema_kv);
341 }
342 None => {
343 meta.push(schema_kv);
344 }
345 }
346}
347
348#[derive(Debug)]
393pub struct ArrowSchemaConverter<'a> {
394 schema_root: &'a str,
396 coerce_types: bool,
400}
401
402impl Default for ArrowSchemaConverter<'_> {
403 fn default() -> Self {
404 Self::new()
405 }
406}
407
408impl<'a> ArrowSchemaConverter<'a> {
409 pub fn new() -> Self {
411 Self {
412 schema_root: "arrow_schema",
413 coerce_types: false,
414 }
415 }
416
417 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
448 self.coerce_types = coerce_types;
449 self
450 }
451
452 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
454 self.schema_root = schema_root;
455 self
456 }
457
458 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
462 let fields = schema
463 .fields()
464 .iter()
465 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
466 .collect::<Result<_>>()?;
467 let group = Type::group_type_builder(self.schema_root)
468 .with_fields(fields)
469 .build()?;
470 Ok(SchemaDescriptor::new(Arc::new(group)))
471 }
472}
473
474fn parse_key_value_metadata(
475 key_value_metadata: Option<&Vec<KeyValue>>,
476) -> Option<HashMap<String, String>> {
477 match key_value_metadata {
478 Some(key_values) => {
479 let map: HashMap<String, String> = key_values
480 .iter()
481 .filter_map(|kv| {
482 kv.value
483 .as_ref()
484 .map(|value| (kv.key.clone(), value.clone()))
485 })
486 .collect();
487
488 if map.is_empty() { None } else { Some(map) }
489 }
490 None => None,
491 }
492}
493
494pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
496 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
497 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
498
499 let parquet_type = parquet_column.self_type();
500 let basic_info = parquet_type.get_basic_info();
501
502 let mut hash_map_size = 0;
503 if basic_info.has_id() {
504 hash_map_size += 1;
505 }
506 if has_extension_type(parquet_type) {
507 hash_map_size += 1;
508 }
509 if hash_map_size == 0 {
510 return Ok(ret);
511 }
512 ret.set_metadata(HashMap::with_capacity(hash_map_size));
513 if basic_info.has_id() {
514 ret.metadata_mut().insert(
515 PARQUET_FIELD_ID_META_KEY.to_string(),
516 basic_info.id().to_string(),
517 );
518 }
519 try_add_extension_type(ret, parquet_column.self_type())
520}
521
522pub fn decimal_length_from_precision(precision: u8) -> usize {
523 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
531}
532
533fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
535 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
536 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
537 const PARQUET_KEY_FIELD_NAME: &str = "key";
538 const PARQUET_VALUE_FIELD_NAME: &str = "value";
539
540 let name = field.name().as_str();
541 let repetition = if field.is_nullable() {
542 Repetition::OPTIONAL
543 } else {
544 Repetition::REQUIRED
545 };
546 let id = field_id(field);
547 match field.data_type() {
549 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
550 .with_logical_type(Some(LogicalType::Unknown))
551 .with_repetition(repetition)
552 .with_id(id)
553 .build(),
554 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
555 .with_repetition(repetition)
556 .with_id(id)
557 .build(),
558 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
559 .with_logical_type(Some(LogicalType::Integer {
560 bit_width: 8,
561 is_signed: true,
562 }))
563 .with_repetition(repetition)
564 .with_id(id)
565 .build(),
566 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
567 .with_logical_type(Some(LogicalType::Integer {
568 bit_width: 16,
569 is_signed: true,
570 }))
571 .with_repetition(repetition)
572 .with_id(id)
573 .build(),
574 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
575 .with_repetition(repetition)
576 .with_id(id)
577 .build(),
578 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
579 .with_repetition(repetition)
580 .with_id(id)
581 .build(),
582 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
583 .with_logical_type(Some(LogicalType::Integer {
584 bit_width: 8,
585 is_signed: false,
586 }))
587 .with_repetition(repetition)
588 .with_id(id)
589 .build(),
590 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
591 .with_logical_type(Some(LogicalType::Integer {
592 bit_width: 16,
593 is_signed: false,
594 }))
595 .with_repetition(repetition)
596 .with_id(id)
597 .build(),
598 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
599 .with_logical_type(Some(LogicalType::Integer {
600 bit_width: 32,
601 is_signed: false,
602 }))
603 .with_repetition(repetition)
604 .with_id(id)
605 .build(),
606 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
607 .with_logical_type(Some(LogicalType::Integer {
608 bit_width: 64,
609 is_signed: false,
610 }))
611 .with_repetition(repetition)
612 .with_id(id)
613 .build(),
614 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615 .with_repetition(repetition)
616 .with_id(id)
617 .with_logical_type(Some(LogicalType::Float16))
618 .with_length(2)
619 .build(),
620 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
621 .with_repetition(repetition)
622 .with_id(id)
623 .build(),
624 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
625 .with_repetition(repetition)
626 .with_id(id)
627 .build(),
628 DataType::Timestamp(TimeUnit::Second, _) => {
629 Type::primitive_type_builder(name, PhysicalType::INT64)
631 .with_repetition(repetition)
632 .with_id(id)
633 .build()
634 }
635 DataType::Timestamp(time_unit, tz) => {
636 Type::primitive_type_builder(name, PhysicalType::INT64)
637 .with_logical_type(Some(LogicalType::Timestamp {
638 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
640 unit: match time_unit {
641 TimeUnit::Second => unreachable!(),
642 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
643 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
644 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
645 },
646 }))
647 .with_repetition(repetition)
648 .with_id(id)
649 .build()
650 }
651 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
652 .with_logical_type(Some(LogicalType::Date))
653 .with_repetition(repetition)
654 .with_id(id)
655 .build(),
656 DataType::Date64 => {
657 if coerce_types {
658 Type::primitive_type_builder(name, PhysicalType::INT32)
659 .with_logical_type(Some(LogicalType::Date))
660 .with_repetition(repetition)
661 .with_id(id)
662 .build()
663 } else {
664 Type::primitive_type_builder(name, PhysicalType::INT64)
665 .with_repetition(repetition)
666 .with_id(id)
667 .build()
668 }
669 }
670 DataType::Time32(TimeUnit::Second) => {
671 Type::primitive_type_builder(name, PhysicalType::INT32)
673 .with_repetition(repetition)
674 .with_id(id)
675 .build()
676 }
677 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
678 .with_logical_type(Some(LogicalType::Time {
679 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
680 unit: match unit {
681 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
682 u => unreachable!("Invalid unit for Time32: {:?}", u),
683 },
684 }))
685 .with_repetition(repetition)
686 .with_id(id)
687 .build(),
688 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
689 .with_logical_type(Some(LogicalType::Time {
690 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
691 unit: match unit {
692 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
693 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
694 u => unreachable!("Invalid unit for Time64: {:?}", u),
695 },
696 }))
697 .with_repetition(repetition)
698 .with_id(id)
699 .build(),
700 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
701 .with_repetition(repetition)
702 .with_id(id)
703 .build(),
704 DataType::Interval(_) => {
705 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
706 .with_converted_type(ConvertedType::INTERVAL)
707 .with_repetition(repetition)
708 .with_id(id)
709 .with_length(12)
710 .build()
711 }
712 DataType::Binary | DataType::LargeBinary => {
713 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
714 .with_repetition(repetition)
715 .with_id(id)
716 .with_logical_type(logical_type_for_binary(field))
717 .build()
718 }
719 DataType::FixedSizeBinary(length) => {
720 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
721 .with_repetition(repetition)
722 .with_id(id)
723 .with_length(*length)
724 .with_logical_type(logical_type_for_fixed_size_binary(field))
725 .build()
726 }
727 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
728 .with_repetition(repetition)
729 .with_id(id)
730 .with_logical_type(logical_type_for_binary_view(field))
731 .build(),
732 DataType::Decimal32(precision, scale)
733 | DataType::Decimal64(precision, scale)
734 | DataType::Decimal128(precision, scale)
735 | DataType::Decimal256(precision, scale) => {
736 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
739 (PhysicalType::INT32, -1)
740 } else if *precision <= 18 {
741 (PhysicalType::INT64, -1)
742 } else {
743 (
744 PhysicalType::FIXED_LEN_BYTE_ARRAY,
745 decimal_length_from_precision(*precision) as i32,
746 )
747 };
748 Type::primitive_type_builder(name, physical_type)
749 .with_repetition(repetition)
750 .with_id(id)
751 .with_length(length)
752 .with_logical_type(Some(LogicalType::Decimal {
753 scale: *scale as i32,
754 precision: *precision as i32,
755 }))
756 .with_precision(*precision as i32)
757 .with_scale(*scale as i32)
758 .build()
759 }
760 DataType::Utf8 | DataType::LargeUtf8 => {
761 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
762 .with_logical_type(logical_type_for_string(field))
763 .with_repetition(repetition)
764 .with_id(id)
765 .build()
766 }
767 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
768 .with_logical_type(logical_type_for_string(field))
769 .with_repetition(repetition)
770 .with_id(id)
771 .build(),
772 DataType::List(f)
773 | DataType::FixedSizeList(f, _)
774 | DataType::LargeList(f)
775 | DataType::ListView(f)
776 | DataType::LargeListView(f) => {
777 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
778 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
780 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
781 } else {
782 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
783 };
784
785 Type::group_type_builder(name)
786 .with_fields(vec![Arc::new(
787 Type::group_type_builder("list")
788 .with_fields(vec![field_ref])
789 .with_repetition(Repetition::REPEATED)
790 .build()?,
791 )])
792 .with_logical_type(Some(LogicalType::List))
793 .with_repetition(repetition)
794 .with_id(id)
795 .build()
796 }
797 DataType::Struct(fields) => {
798 if fields.is_empty() {
799 return Err(arrow_err!("Parquet does not support writing empty structs",));
800 }
801 let fields = fields
803 .iter()
804 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
805 .collect::<Result<_>>()?;
806 Type::group_type_builder(name)
807 .with_fields(fields)
808 .with_repetition(repetition)
809 .with_id(id)
810 .with_logical_type(logical_type_for_struct(field))
811 .build()
812 }
813 DataType::Map(field, _) => {
814 if let DataType::Struct(struct_fields) = field.data_type() {
815 let map_struct_name = if coerce_types {
817 PARQUET_MAP_STRUCT_NAME
818 } else {
819 field.name()
820 };
821
822 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
824 if coerce_types && fld.name() != name {
825 let f = fld.as_ref().clone().with_name(name);
826 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
827 } else {
828 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
829 }
830 };
831 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
832 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
833
834 Type::group_type_builder(name)
835 .with_fields(vec![Arc::new(
836 Type::group_type_builder(map_struct_name)
837 .with_fields(vec![key_field, val_field])
838 .with_repetition(Repetition::REPEATED)
839 .build()?,
840 )])
841 .with_logical_type(Some(LogicalType::Map))
842 .with_repetition(repetition)
843 .with_id(id)
844 .build()
845 } else {
846 Err(arrow_err!(
847 "DataType::Map should contain a struct field child",
848 ))
849 }
850 }
851 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
852 DataType::Dictionary(_, value) => {
853 let dict_field = field.clone().with_data_type(value.as_ref().clone());
855 arrow_to_parquet_type(&dict_field, coerce_types)
856 }
857 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
858 "Converting RunEndEncodedType to parquet not supported",
859 )),
860 }
861}
862
863fn field_id(field: &Field) -> Option<i32> {
864 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
865 value.parse().ok() }
867
868#[cfg(test)]
869mod tests {
870 use super::*;
871
872 use std::{collections::HashMap, sync::Arc};
873
874 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
875 use crate::file::metadata::KeyValue;
876 use crate::file::reader::FileReader;
877 use crate::{
878 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
879 schema::{parser::parse_message_type, types::SchemaDescriptor},
880 };
881 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
882
883 #[test]
884 fn test_flat_primitives() {
885 let message_type = "
886 message test_schema {
887 REQUIRED BOOLEAN boolean;
888 REQUIRED INT32 int8 (INT_8);
889 REQUIRED INT32 int16 (INT_16);
890 REQUIRED INT32 uint8 (INTEGER(8,false));
891 REQUIRED INT32 uint16 (INTEGER(16,false));
892 REQUIRED INT32 int32;
893 REQUIRED INT64 int64;
894 OPTIONAL DOUBLE double;
895 OPTIONAL FLOAT float;
896 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
897 OPTIONAL BINARY string (UTF8);
898 OPTIONAL BINARY string_2 (STRING);
899 OPTIONAL BINARY json (JSON);
900 }
901 ";
902 let parquet_group_type = parse_message_type(message_type).unwrap();
903
904 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
905 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
906
907 let arrow_fields = Fields::from(vec![
908 Field::new("boolean", DataType::Boolean, false),
909 Field::new("int8", DataType::Int8, false),
910 Field::new("int16", DataType::Int16, false),
911 Field::new("uint8", DataType::UInt8, false),
912 Field::new("uint16", DataType::UInt16, false),
913 Field::new("int32", DataType::Int32, false),
914 Field::new("int64", DataType::Int64, false),
915 Field::new("double", DataType::Float64, true),
916 Field::new("float", DataType::Float32, true),
917 Field::new("float16", DataType::Float16, true),
918 Field::new("string", DataType::Utf8, true),
919 Field::new("string_2", DataType::Utf8, true),
920 json_field(),
921 ]);
922
923 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
924 }
925
926 fn json_field() -> Field {
929 #[cfg(feature = "arrow_canonical_extension_types")]
930 {
931 Field::new("json", DataType::Utf8, true)
932 .with_extension_type(arrow_schema::extension::Json::default())
933 }
934 #[cfg(not(feature = "arrow_canonical_extension_types"))]
935 {
936 Field::new("json", DataType::Utf8, true)
937 }
938 }
939
940 #[test]
941 fn test_decimal_fields() {
942 let message_type = "
943 message test_schema {
944 REQUIRED INT32 decimal1 (DECIMAL(4,2));
945 REQUIRED INT64 decimal2 (DECIMAL(12,2));
946 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
947 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
948 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
949 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
950 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
951 }
952 ";
953
954 let parquet_group_type = parse_message_type(message_type).unwrap();
955
956 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
957 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
958
959 let arrow_fields = Fields::from(vec![
960 Field::new("decimal1", DataType::Decimal128(4, 2), false),
961 Field::new("decimal2", DataType::Decimal128(12, 2), false),
962 Field::new("decimal3", DataType::Decimal128(30, 2), false),
963 Field::new("decimal4", DataType::Decimal128(33, 2), false),
964 Field::new("decimal5", DataType::Decimal128(38, 2), false),
965 Field::new("decimal6", DataType::Decimal256(39, 2), false),
966 Field::new("decimal7", DataType::Decimal256(39, 2), false),
967 ]);
968 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
969 }
970
971 #[test]
972 fn test_byte_array_fields() {
973 let message_type = "
974 message test_schema {
975 REQUIRED BYTE_ARRAY binary;
976 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
977 }
978 ";
979
980 let parquet_group_type = parse_message_type(message_type).unwrap();
981
982 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
983 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
984
985 let arrow_fields = Fields::from(vec![
986 Field::new("binary", DataType::Binary, false),
987 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
988 ]);
989 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
990 }
991
992 #[test]
993 fn test_duplicate_fields() {
994 let message_type = "
995 message test_schema {
996 REQUIRED BOOLEAN boolean;
997 REQUIRED INT32 int8 (INT_8);
998 }
999 ";
1000
1001 let parquet_group_type = parse_message_type(message_type).unwrap();
1002
1003 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1004 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1005
1006 let arrow_fields = Fields::from(vec![
1007 Field::new("boolean", DataType::Boolean, false),
1008 Field::new("int8", DataType::Int8, false),
1009 ]);
1010 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1011
1012 let converted_arrow_schema =
1013 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1014 .unwrap();
1015 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1016 }
1017
1018 #[test]
1019 fn test_parquet_lists() {
1020 let mut arrow_fields = Vec::new();
1021
1022 let message_type = "
1024 message test_schema {
1025 REQUIRED GROUP my_list (LIST) {
1026 REPEATED GROUP list {
1027 OPTIONAL BINARY element (UTF8);
1028 }
1029 }
1030 OPTIONAL GROUP my_list (LIST) {
1031 REPEATED GROUP list {
1032 REQUIRED BINARY element (UTF8);
1033 }
1034 }
1035 OPTIONAL GROUP array_of_arrays (LIST) {
1036 REPEATED GROUP list {
1037 REQUIRED GROUP element (LIST) {
1038 REPEATED GROUP list {
1039 REQUIRED INT32 element;
1040 }
1041 }
1042 }
1043 }
1044 OPTIONAL GROUP my_list (LIST) {
1045 REPEATED GROUP element {
1046 REQUIRED BINARY str (UTF8);
1047 }
1048 }
1049 OPTIONAL GROUP my_list (LIST) {
1050 REPEATED INT32 element;
1051 }
1052 OPTIONAL GROUP my_list (LIST) {
1053 REPEATED GROUP element {
1054 REQUIRED BINARY str (UTF8);
1055 REQUIRED INT32 num;
1056 }
1057 }
1058 OPTIONAL GROUP my_list (LIST) {
1059 REPEATED GROUP array {
1060 REQUIRED BINARY str (UTF8);
1061 }
1062
1063 }
1064 OPTIONAL GROUP my_list (LIST) {
1065 REPEATED GROUP my_list_tuple {
1066 REQUIRED BINARY str (UTF8);
1067 }
1068 }
1069 REPEATED INT32 name;
1070 }
1071 ";
1072
1073 {
1080 arrow_fields.push(Field::new_list(
1081 "my_list",
1082 Field::new("element", DataType::Utf8, true),
1083 false,
1084 ));
1085 }
1086
1087 {
1094 arrow_fields.push(Field::new_list(
1095 "my_list",
1096 Field::new("element", DataType::Utf8, false),
1097 true,
1098 ));
1099 }
1100
1101 {
1114 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1115 arrow_fields.push(Field::new_list(
1116 "array_of_arrays",
1117 Field::new_list("element", arrow_inner_list, false),
1118 true,
1119 ));
1120 }
1121
1122 {
1129 arrow_fields.push(Field::new_list(
1130 "my_list",
1131 Field::new("str", DataType::Utf8, false),
1132 true,
1133 ));
1134 }
1135
1136 {
1141 arrow_fields.push(Field::new_list(
1142 "my_list",
1143 Field::new("element", DataType::Int32, false),
1144 true,
1145 ));
1146 }
1147
1148 {
1156 let fields = vec![
1157 Field::new("str", DataType::Utf8, false),
1158 Field::new("num", DataType::Int32, false),
1159 ];
1160 arrow_fields.push(Field::new_list(
1161 "my_list",
1162 Field::new_struct("element", fields, false),
1163 true,
1164 ));
1165 }
1166
1167 {
1175 let fields = vec![Field::new("str", DataType::Utf8, false)];
1176 arrow_fields.push(Field::new_list(
1177 "my_list",
1178 Field::new_struct("array", fields, false),
1179 true,
1180 ));
1181 }
1182
1183 {
1191 let fields = vec![Field::new("str", DataType::Utf8, false)];
1192 arrow_fields.push(Field::new_list(
1193 "my_list",
1194 Field::new_struct("my_list_tuple", fields, false),
1195 true,
1196 ));
1197 }
1198
1199 {
1202 arrow_fields.push(Field::new_list(
1203 "name",
1204 Field::new("name", DataType::Int32, false),
1205 false,
1206 ));
1207 }
1208
1209 let parquet_group_type = parse_message_type(message_type).unwrap();
1210
1211 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1212 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1213 let converted_fields = converted_arrow_schema.fields();
1214
1215 assert_eq!(arrow_fields.len(), converted_fields.len());
1216 for i in 0..arrow_fields.len() {
1217 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1218 }
1219 }
1220
1221 #[test]
1222 fn test_parquet_list_nullable() {
1223 let mut arrow_fields = Vec::new();
1224
1225 let message_type = "
1226 message test_schema {
1227 REQUIRED GROUP my_list1 (LIST) {
1228 REPEATED GROUP list {
1229 OPTIONAL BINARY element (UTF8);
1230 }
1231 }
1232 OPTIONAL GROUP my_list2 (LIST) {
1233 REPEATED GROUP list {
1234 REQUIRED BINARY element (UTF8);
1235 }
1236 }
1237 REQUIRED GROUP my_list3 (LIST) {
1238 REPEATED GROUP list {
1239 REQUIRED BINARY element (UTF8);
1240 }
1241 }
1242 }
1243 ";
1244
1245 {
1252 arrow_fields.push(Field::new_list(
1253 "my_list1",
1254 Field::new("element", DataType::Utf8, true),
1255 false,
1256 ));
1257 }
1258
1259 {
1266 arrow_fields.push(Field::new_list(
1267 "my_list2",
1268 Field::new("element", DataType::Utf8, false),
1269 true,
1270 ));
1271 }
1272
1273 {
1280 arrow_fields.push(Field::new_list(
1281 "my_list3",
1282 Field::new("element", DataType::Utf8, false),
1283 false,
1284 ));
1285 }
1286
1287 let parquet_group_type = parse_message_type(message_type).unwrap();
1288
1289 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1290 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1291 let converted_fields = converted_arrow_schema.fields();
1292
1293 assert_eq!(arrow_fields.len(), converted_fields.len());
1294 for i in 0..arrow_fields.len() {
1295 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1296 }
1297 }
1298
1299 #[test]
1300 fn test_parquet_maps() {
1301 let mut arrow_fields = Vec::new();
1302
1303 let message_type = "
1305 message test_schema {
1306 REQUIRED group my_map1 (MAP) {
1307 REPEATED group key_value {
1308 REQUIRED binary key (UTF8);
1309 OPTIONAL int32 value;
1310 }
1311 }
1312 OPTIONAL group my_map2 (MAP) {
1313 REPEATED group map {
1314 REQUIRED binary str (UTF8);
1315 REQUIRED int32 num;
1316 }
1317 }
1318 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1319 REPEATED group map {
1320 REQUIRED binary key (UTF8);
1321 OPTIONAL int32 value;
1322 }
1323 }
1324 REQUIRED group my_map4 (MAP) {
1325 REPEATED group map {
1326 OPTIONAL binary key (UTF8);
1327 REQUIRED int32 value;
1328 }
1329 }
1330 }
1331 ";
1332
1333 {
1341 arrow_fields.push(Field::new_map(
1342 "my_map1",
1343 "key_value",
1344 Field::new("key", DataType::Utf8, false),
1345 Field::new("value", DataType::Int32, true),
1346 false,
1347 false,
1348 ));
1349 }
1350
1351 {
1359 arrow_fields.push(Field::new_map(
1360 "my_map2",
1361 "map",
1362 Field::new("str", DataType::Utf8, false),
1363 Field::new("num", DataType::Int32, false),
1364 false,
1365 true,
1366 ));
1367 }
1368
1369 {
1377 arrow_fields.push(Field::new_map(
1378 "my_map3",
1379 "map",
1380 Field::new("key", DataType::Utf8, false),
1381 Field::new("value", DataType::Int32, true),
1382 false,
1383 true,
1384 ));
1385 }
1386
1387 {
1395 arrow_fields.push(Field::new_map(
1396 "my_map4",
1397 "map",
1398 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1400 false,
1401 false,
1402 ));
1403 }
1404
1405 let parquet_group_type = parse_message_type(message_type).unwrap();
1406
1407 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1408 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1409 let converted_fields = converted_arrow_schema.fields();
1410
1411 assert_eq!(arrow_fields.len(), converted_fields.len());
1412 for i in 0..arrow_fields.len() {
1413 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1414 }
1415 }
1416
1417 #[test]
1418 fn test_nested_schema() {
1419 let mut arrow_fields = Vec::new();
1420 {
1421 let group1_fields = Fields::from(vec![
1422 Field::new("leaf1", DataType::Boolean, false),
1423 Field::new("leaf2", DataType::Int32, false),
1424 ]);
1425 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1426 arrow_fields.push(group1_struct);
1427
1428 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1429 arrow_fields.push(leaf3_field);
1430 }
1431
1432 let message_type = "
1433 message test_schema {
1434 REQUIRED GROUP group1 {
1435 REQUIRED BOOLEAN leaf1;
1436 REQUIRED INT32 leaf2;
1437 }
1438 REQUIRED INT64 leaf3;
1439 }
1440 ";
1441 let parquet_group_type = parse_message_type(message_type).unwrap();
1442
1443 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1444 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1445 let converted_fields = converted_arrow_schema.fields();
1446
1447 assert_eq!(arrow_fields.len(), converted_fields.len());
1448 for i in 0..arrow_fields.len() {
1449 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1450 }
1451 }
1452
1453 #[test]
1454 fn test_nested_schema_partial() {
1455 let mut arrow_fields = Vec::new();
1456 {
1457 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1458 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1459 arrow_fields.push(group1);
1460
1461 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1462 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1463 arrow_fields.push(group2);
1464
1465 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1466 }
1467
1468 let message_type = "
1469 message test_schema {
1470 REQUIRED GROUP group1 {
1471 REQUIRED INT64 leaf1;
1472 REQUIRED INT64 leaf2;
1473 }
1474 REQUIRED GROUP group2 {
1475 REQUIRED INT64 leaf3;
1476 REQUIRED INT64 leaf4;
1477 }
1478 REQUIRED INT64 leaf5;
1479 }
1480 ";
1481 let parquet_group_type = parse_message_type(message_type).unwrap();
1482
1483 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1493 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1494 let converted_arrow_schema =
1495 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1496 let converted_fields = converted_arrow_schema.fields();
1497
1498 assert_eq!(arrow_fields.len(), converted_fields.len());
1499 for i in 0..arrow_fields.len() {
1500 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1501 }
1502 }
1503
1504 #[test]
1505 fn test_nested_schema_partial_ordering() {
1506 let mut arrow_fields = Vec::new();
1507 {
1508 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1509 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1510 arrow_fields.push(group1);
1511
1512 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1513 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1514 arrow_fields.push(group2);
1515
1516 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1517 }
1518
1519 let message_type = "
1520 message test_schema {
1521 REQUIRED GROUP group1 {
1522 REQUIRED INT64 leaf1;
1523 REQUIRED INT64 leaf2;
1524 }
1525 REQUIRED GROUP group2 {
1526 REQUIRED INT64 leaf3;
1527 REQUIRED INT64 leaf4;
1528 }
1529 REQUIRED INT64 leaf5;
1530 }
1531 ";
1532 let parquet_group_type = parse_message_type(message_type).unwrap();
1533
1534 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1544 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1545 let converted_arrow_schema =
1546 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1547 let converted_fields = converted_arrow_schema.fields();
1548
1549 assert_eq!(arrow_fields.len(), converted_fields.len());
1550 for i in 0..arrow_fields.len() {
1551 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1552 }
1553
1554 let mask =
1555 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1556 let converted_arrow_schema =
1557 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1558 let converted_fields = converted_arrow_schema.fields();
1559
1560 assert_eq!(arrow_fields.len(), converted_fields.len());
1561 for i in 0..arrow_fields.len() {
1562 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1563 }
1564 }
1565
1566 #[test]
1567 fn test_repeated_nested_schema() {
1568 let mut arrow_fields = Vec::new();
1569 {
1570 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1571
1572 let inner_group_list = Field::new_list(
1573 "innerGroup",
1574 Field::new_struct(
1575 "innerGroup",
1576 vec![Field::new("leaf3", DataType::Int32, true)],
1577 false,
1578 ),
1579 false,
1580 );
1581
1582 let outer_group_list = Field::new_list(
1583 "outerGroup",
1584 Field::new_struct(
1585 "outerGroup",
1586 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1587 false,
1588 ),
1589 false,
1590 );
1591 arrow_fields.push(outer_group_list);
1592 }
1593
1594 let message_type = "
1595 message test_schema {
1596 OPTIONAL INT32 leaf1;
1597 REPEATED GROUP outerGroup {
1598 OPTIONAL INT32 leaf2;
1599 REPEATED GROUP innerGroup {
1600 OPTIONAL INT32 leaf3;
1601 }
1602 }
1603 }
1604 ";
1605 let parquet_group_type = parse_message_type(message_type).unwrap();
1606
1607 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1608 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1609 let converted_fields = converted_arrow_schema.fields();
1610
1611 assert_eq!(arrow_fields.len(), converted_fields.len());
1612 for i in 0..arrow_fields.len() {
1613 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1614 }
1615 }
1616
1617 #[test]
1618 fn test_column_desc_to_field() {
1619 let message_type = "
1620 message test_schema {
1621 REQUIRED BOOLEAN boolean;
1622 REQUIRED INT32 int8 (INT_8);
1623 REQUIRED INT32 uint8 (INTEGER(8,false));
1624 REQUIRED INT32 int16 (INT_16);
1625 REQUIRED INT32 uint16 (INTEGER(16,false));
1626 REQUIRED INT32 int32;
1627 REQUIRED INT64 int64;
1628 OPTIONAL DOUBLE double;
1629 OPTIONAL FLOAT float;
1630 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1631 OPTIONAL BINARY string (UTF8);
1632 REPEATED BOOLEAN bools;
1633 OPTIONAL INT32 date (DATE);
1634 OPTIONAL INT32 time_milli (TIME_MILLIS);
1635 OPTIONAL INT64 time_micro (TIME_MICROS);
1636 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1637 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1638 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1639 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1640 REPEATED INT32 int_list;
1641 REPEATED BINARY byte_list;
1642 REPEATED BINARY string_list (UTF8);
1643 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1644 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1645 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1646 }
1647 ";
1648 let parquet_group_type = parse_message_type(message_type).unwrap();
1649
1650 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1651 let converted_arrow_fields = parquet_schema
1652 .columns()
1653 .iter()
1654 .map(|c| parquet_to_arrow_field(c).unwrap())
1655 .collect::<Vec<Field>>();
1656
1657 let arrow_fields = vec![
1658 Field::new("boolean", DataType::Boolean, false),
1659 Field::new("int8", DataType::Int8, false),
1660 Field::new("uint8", DataType::UInt8, false),
1661 Field::new("int16", DataType::Int16, false),
1662 Field::new("uint16", DataType::UInt16, false),
1663 Field::new("int32", DataType::Int32, false),
1664 Field::new("int64", DataType::Int64, false),
1665 Field::new("double", DataType::Float64, true),
1666 Field::new("float", DataType::Float32, true),
1667 Field::new("float16", DataType::Float16, true),
1668 Field::new("string", DataType::Utf8, true),
1669 Field::new_list(
1670 "bools",
1671 Field::new("bools", DataType::Boolean, false),
1672 false,
1673 ),
1674 Field::new("date", DataType::Date32, true),
1675 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1676 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1677 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1678 Field::new(
1679 "ts_milli",
1680 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1681 true,
1682 ),
1683 Field::new(
1684 "ts_micro",
1685 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1686 false,
1687 ),
1688 Field::new(
1689 "ts_nano",
1690 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1691 false,
1692 ),
1693 Field::new_list(
1694 "int_list",
1695 Field::new("int_list", DataType::Int32, false),
1696 false,
1697 ),
1698 Field::new_list(
1699 "byte_list",
1700 Field::new("byte_list", DataType::Binary, false),
1701 false,
1702 ),
1703 Field::new_list(
1704 "string_list",
1705 Field::new("string_list", DataType::Utf8, false),
1706 false,
1707 ),
1708 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1709 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1710 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1711 ];
1712
1713 assert_eq!(arrow_fields, converted_arrow_fields);
1714 }
1715
1716 #[test]
1717 fn test_coerced_map_list() {
1718 let arrow_fields = vec![
1720 Field::new_list(
1721 "my_list",
1722 Field::new("item", DataType::Boolean, true),
1723 false,
1724 ),
1725 Field::new_map(
1726 "my_map",
1727 "entries",
1728 Field::new("keys", DataType::Utf8, false),
1729 Field::new("values", DataType::Int32, true),
1730 false,
1731 true,
1732 ),
1733 ];
1734 let arrow_schema = Schema::new(arrow_fields);
1735
1736 let message_type = "
1738 message parquet_schema {
1739 REQUIRED GROUP my_list (LIST) {
1740 REPEATED GROUP list {
1741 OPTIONAL BOOLEAN element;
1742 }
1743 }
1744 OPTIONAL GROUP my_map (MAP) {
1745 REPEATED GROUP key_value {
1746 REQUIRED BINARY key (STRING);
1747 OPTIONAL INT32 value;
1748 }
1749 }
1750 }
1751 ";
1752 let parquet_group_type = parse_message_type(message_type).unwrap();
1753 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1754 let converted_arrow_schema = ArrowSchemaConverter::new()
1755 .with_coerce_types(true)
1756 .convert(&arrow_schema)
1757 .unwrap();
1758 assert_eq!(
1759 parquet_schema.columns().len(),
1760 converted_arrow_schema.columns().len()
1761 );
1762
1763 let message_type = "
1765 message parquet_schema {
1766 REQUIRED GROUP my_list (LIST) {
1767 REPEATED GROUP list {
1768 OPTIONAL BOOLEAN item;
1769 }
1770 }
1771 OPTIONAL GROUP my_map (MAP) {
1772 REPEATED GROUP entries {
1773 REQUIRED BINARY keys (STRING);
1774 OPTIONAL INT32 values;
1775 }
1776 }
1777 }
1778 ";
1779 let parquet_group_type = parse_message_type(message_type).unwrap();
1780 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1781 let converted_arrow_schema = ArrowSchemaConverter::new()
1782 .with_coerce_types(false)
1783 .convert(&arrow_schema)
1784 .unwrap();
1785 assert_eq!(
1786 parquet_schema.columns().len(),
1787 converted_arrow_schema.columns().len()
1788 );
1789 }
1790
1791 #[test]
1792 fn test_field_to_column_desc() {
1793 let message_type = "
1794 message arrow_schema {
1795 REQUIRED BOOLEAN boolean;
1796 REQUIRED INT32 int8 (INT_8);
1797 REQUIRED INT32 int16 (INTEGER(16,true));
1798 REQUIRED INT32 int32;
1799 REQUIRED INT64 int64;
1800 OPTIONAL DOUBLE double;
1801 OPTIONAL FLOAT float;
1802 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1803 OPTIONAL BINARY string (STRING);
1804 OPTIONAL GROUP bools (LIST) {
1805 REPEATED GROUP list {
1806 OPTIONAL BOOLEAN element;
1807 }
1808 }
1809 REQUIRED GROUP bools_non_null (LIST) {
1810 REPEATED GROUP list {
1811 REQUIRED BOOLEAN element;
1812 }
1813 }
1814 OPTIONAL INT32 date (DATE);
1815 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1816 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1817 OPTIONAL INT64 time_micro (TIME_MICROS);
1818 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1819 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1820 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1821 REQUIRED INT64 ts_seconds;
1822 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1823 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1824 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1825 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1826 REQUIRED GROUP struct {
1827 REQUIRED BOOLEAN bools;
1828 REQUIRED INT32 uint32 (INTEGER(32,false));
1829 REQUIRED GROUP int32 (LIST) {
1830 REPEATED GROUP list {
1831 OPTIONAL INT32 element;
1832 }
1833 }
1834 }
1835 REQUIRED BINARY dictionary_strings (STRING);
1836 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1837 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1838 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1839 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1840 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1841 }
1842 ";
1843 let parquet_group_type = parse_message_type(message_type).unwrap();
1844
1845 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1846
1847 let arrow_fields = vec![
1848 Field::new("boolean", DataType::Boolean, false),
1849 Field::new("int8", DataType::Int8, false),
1850 Field::new("int16", DataType::Int16, false),
1851 Field::new("int32", DataType::Int32, false),
1852 Field::new("int64", DataType::Int64, false),
1853 Field::new("double", DataType::Float64, true),
1854 Field::new("float", DataType::Float32, true),
1855 Field::new("float16", DataType::Float16, true),
1856 Field::new("string", DataType::Utf8, true),
1857 Field::new_list(
1858 "bools",
1859 Field::new("element", DataType::Boolean, true),
1860 true,
1861 ),
1862 Field::new_list(
1863 "bools_non_null",
1864 Field::new("element", DataType::Boolean, false),
1865 false,
1866 ),
1867 Field::new("date", DataType::Date32, true),
1868 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1869 Field::new(
1870 "time_milli_utc",
1871 DataType::Time32(TimeUnit::Millisecond),
1872 true,
1873 )
1874 .with_metadata(HashMap::from_iter(vec![(
1875 "adjusted_to_utc".to_string(),
1876 "".to_string(),
1877 )])),
1878 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1879 Field::new(
1880 "time_micro_utc",
1881 DataType::Time64(TimeUnit::Microsecond),
1882 true,
1883 )
1884 .with_metadata(HashMap::from_iter(vec![(
1885 "adjusted_to_utc".to_string(),
1886 "".to_string(),
1887 )])),
1888 Field::new(
1889 "ts_milli",
1890 DataType::Timestamp(TimeUnit::Millisecond, None),
1891 true,
1892 ),
1893 Field::new(
1894 "ts_micro",
1895 DataType::Timestamp(TimeUnit::Microsecond, None),
1896 false,
1897 ),
1898 Field::new(
1899 "ts_seconds",
1900 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1901 false,
1902 ),
1903 Field::new(
1904 "ts_micro_utc",
1905 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1906 false,
1907 ),
1908 Field::new(
1909 "ts_millis_zero_offset",
1910 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1911 false,
1912 ),
1913 Field::new(
1914 "ts_millis_zero_negative_offset",
1915 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1916 false,
1917 ),
1918 Field::new(
1919 "ts_micro_non_utc",
1920 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1921 false,
1922 ),
1923 Field::new_struct(
1924 "struct",
1925 vec![
1926 Field::new("bools", DataType::Boolean, false),
1927 Field::new("uint32", DataType::UInt32, false),
1928 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1929 ],
1930 false,
1931 ),
1932 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1933 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1934 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1935 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1936 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1937 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1938 ];
1939 let arrow_schema = Schema::new(arrow_fields);
1940 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1941
1942 assert_eq!(
1943 parquet_schema.columns().len(),
1944 converted_arrow_schema.columns().len()
1945 );
1946 parquet_schema
1947 .columns()
1948 .iter()
1949 .zip(converted_arrow_schema.columns())
1950 .for_each(|(a, b)| {
1951 match a.logical_type_ref() {
1956 Some(_) => {
1957 assert_eq!(a, b)
1958 }
1959 None => {
1960 assert_eq!(a.name(), b.name());
1961 assert_eq!(a.physical_type(), b.physical_type());
1962 assert_eq!(a.converted_type(), b.converted_type());
1963 }
1964 };
1965 });
1966 }
1967
1968 #[test]
1969 #[should_panic(expected = "Parquet does not support writing empty structs")]
1970 fn test_empty_struct_field() {
1971 let arrow_fields = vec![Field::new(
1972 "struct",
1973 DataType::Struct(Fields::empty()),
1974 false,
1975 )];
1976 let arrow_schema = Schema::new(arrow_fields);
1977 let converted_arrow_schema = ArrowSchemaConverter::new()
1978 .with_coerce_types(true)
1979 .convert(&arrow_schema);
1980
1981 converted_arrow_schema.unwrap();
1982 }
1983
1984 #[test]
1985 fn test_metadata() {
1986 let message_type = "
1987 message test_schema {
1988 OPTIONAL BINARY string (STRING);
1989 }
1990 ";
1991 let parquet_group_type = parse_message_type(message_type).unwrap();
1992
1993 let key_value_metadata = vec![
1994 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1995 KeyValue::new("baz".to_owned(), None),
1996 ];
1997
1998 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1999 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
2000
2001 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2002 let converted_arrow_schema =
2003 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2004
2005 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2006 }
2007
2008 #[test]
2009 fn test_arrow_schema_roundtrip() -> Result<()> {
2010 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2011 a.iter()
2012 .map(|(a, b)| (a.to_string(), b.to_string()))
2013 .collect()
2014 };
2015
2016 let schema = Schema::new_with_metadata(
2017 vec![
2018 Field::new("c1", DataType::Utf8, false)
2019 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2020 Field::new("c2", DataType::Binary, false),
2021 Field::new("c3", DataType::FixedSizeBinary(3), false),
2022 Field::new("c4", DataType::Boolean, false),
2023 Field::new("c5", DataType::Date32, false),
2024 Field::new("c6", DataType::Date64, false),
2025 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2026 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2027 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2028 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2029 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2030 Field::new(
2031 "c16",
2032 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2033 false,
2034 ),
2035 Field::new(
2036 "c17",
2037 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2038 false,
2039 ),
2040 Field::new(
2041 "c18",
2042 DataType::Timestamp(TimeUnit::Nanosecond, None),
2043 false,
2044 ),
2045 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2046 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2047 Field::new_list(
2048 "c21",
2049 Field::new_list_field(DataType::Boolean, true)
2050 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2051 false,
2052 )
2053 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2054 Field::new(
2055 "c22",
2056 DataType::FixedSizeList(
2057 Arc::new(Field::new_list_field(DataType::Boolean, true)),
2058 5,
2059 ),
2060 false,
2061 ),
2062 Field::new_list(
2063 "c23",
2064 Field::new_large_list(
2065 "inner",
2066 Field::new_list_field(
2067 DataType::Struct(
2068 vec![
2069 Field::new("a", DataType::Int16, true),
2070 Field::new("b", DataType::Float64, false),
2071 Field::new("c", DataType::Float32, false),
2072 Field::new("d", DataType::Float16, false),
2073 ]
2074 .into(),
2075 ),
2076 false,
2077 ),
2078 true,
2079 ),
2080 false,
2081 ),
2082 Field::new(
2083 "c24",
2084 DataType::Struct(Fields::from(vec![
2085 Field::new("a", DataType::Utf8, false),
2086 Field::new("b", DataType::UInt16, false),
2087 ])),
2088 false,
2089 ),
2090 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2091 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2092 #[allow(deprecated)]
2098 Field::new_dict(
2099 "c31",
2100 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2101 true,
2102 123,
2103 true,
2104 )
2105 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2106 Field::new("c32", DataType::LargeBinary, true),
2107 Field::new("c33", DataType::LargeUtf8, true),
2108 Field::new_large_list(
2109 "c34",
2110 Field::new_list(
2111 "inner",
2112 Field::new_list_field(
2113 DataType::Struct(
2114 vec![
2115 Field::new("a", DataType::Int16, true),
2116 Field::new("b", DataType::Float64, true),
2117 ]
2118 .into(),
2119 ),
2120 true,
2121 ),
2122 true,
2123 ),
2124 true,
2125 ),
2126 Field::new("c35", DataType::Null, true),
2127 Field::new("c36", DataType::Decimal128(2, 1), false),
2128 Field::new("c37", DataType::Decimal256(50, 20), false),
2129 Field::new("c38", DataType::Decimal128(18, 12), true),
2130 Field::new_map(
2131 "c39",
2132 "key_value",
2133 Field::new("key", DataType::Utf8, false),
2134 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2135 false, true,
2137 ),
2138 Field::new_map(
2139 "c40",
2140 "my_entries",
2141 Field::new("my_key", DataType::Utf8, false)
2142 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2143 Field::new_list(
2144 "my_value",
2145 Field::new_list_field(DataType::Utf8, true)
2146 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2147 true,
2148 )
2149 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2150 false, true,
2152 )
2153 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2154 Field::new_map(
2155 "c41",
2156 "my_entries",
2157 Field::new("my_key", DataType::Utf8, false),
2158 Field::new_list(
2159 "my_value",
2160 Field::new_list_field(DataType::Utf8, true)
2161 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2162 true,
2163 ),
2164 false, false,
2166 ),
2167 Field::new("c42", DataType::Decimal32(5, 2), false),
2168 Field::new("c43", DataType::Decimal64(18, 12), true),
2169 ],
2170 meta(&[("Key", "Value")]),
2171 );
2172
2173 let file = tempfile::tempfile().unwrap();
2175 let writer =
2176 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2177 writer.close()?;
2178
2179 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2181
2182 let read_schema = arrow_reader.schema();
2184 assert_eq!(&schema, read_schema.as_ref());
2185
2186 let mut stack = Vec::with_capacity(10);
2188 let mut out = Vec::with_capacity(10);
2189
2190 let root = arrow_reader.parquet_schema().root_schema_ptr();
2191 stack.push((root.name().to_string(), root));
2192
2193 while let Some((p, t)) = stack.pop() {
2194 if t.is_group() {
2195 for f in t.get_fields() {
2196 stack.push((format!("{p}.{}", f.name()), f.clone()))
2197 }
2198 }
2199
2200 let info = t.get_basic_info();
2201 if info.has_id() {
2202 out.push(format!("{p} -> {}", info.id()))
2203 }
2204 }
2205 out.sort_unstable();
2206 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2207
2208 assert_eq!(
2209 &out,
2210 &[
2211 "arrow_schema.c1 -> 2",
2212 "arrow_schema.c21 -> 4",
2213 "arrow_schema.c21.list.item -> 5",
2214 "arrow_schema.c31 -> 6",
2215 "arrow_schema.c40 -> 7",
2216 "arrow_schema.c40.my_entries.my_key -> 8",
2217 "arrow_schema.c40.my_entries.my_value -> 9",
2218 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2219 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2220 ]
2221 );
2222
2223 Ok(())
2224 }
2225
2226 #[test]
2227 fn test_read_parquet_field_ids_raw() -> Result<()> {
2228 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2229 a.iter()
2230 .map(|(a, b)| (a.to_string(), b.to_string()))
2231 .collect()
2232 };
2233 let schema = Schema::new_with_metadata(
2234 vec![
2235 Field::new("c1", DataType::Utf8, true)
2236 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2237 Field::new("c2", DataType::Utf8, true)
2238 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2239 ],
2240 HashMap::new(),
2241 );
2242
2243 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2244 let parquet_bytes = writer.into_inner()?;
2245
2246 let reader =
2247 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2248 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2249
2250 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2252
2253 let parq_schema_descr = ArrowSchemaConverter::new()
2254 .with_coerce_types(true)
2255 .convert(&arrow_schema)?;
2256 let parq_fields = parq_schema_descr.root_schema().get_fields();
2257 assert_eq!(parq_fields.len(), 2);
2258 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2259 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2260
2261 Ok(())
2262 }
2263
2264 #[test]
2265 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2266 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2267 .iter()
2268 .cloned()
2269 .collect();
2270
2271 let schema = Schema::new_with_metadata(
2272 vec![
2273 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2274 Field::new(
2275 "c22",
2276 DataType::FixedSizeList(
2277 Arc::new(Field::new("items", DataType::Boolean, false)),
2278 5,
2279 ),
2280 false,
2281 ),
2282 Field::new_list(
2283 "c23",
2284 Field::new_large_list(
2285 "items",
2286 Field::new_struct(
2287 "items",
2288 vec![
2289 Field::new("a", DataType::Int16, true),
2290 Field::new("b", DataType::Float64, false),
2291 ],
2292 true,
2293 ),
2294 true,
2295 ),
2296 true,
2297 ),
2298 ],
2299 metadata,
2300 );
2301
2302 let file = tempfile::tempfile().unwrap();
2304 let writer =
2305 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2306 writer.close()?;
2307
2308 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2310 let read_schema = arrow_reader.schema();
2311 assert_eq!(&schema, read_schema.as_ref());
2312 Ok(())
2313 }
2314
2315 #[test]
2316 fn test_get_arrow_schema_from_metadata() {
2317 assert!(get_arrow_schema_from_metadata("").is_err());
2318 }
2319
2320 #[test]
2321 #[cfg(feature = "arrow_canonical_extension_types")]
2322 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2323 use arrow_schema::extension::Uuid;
2324 let arrow_schema = Schema::new(vec![
2325 Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2326 ]);
2327
2328 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2329
2330 assert_eq!(
2331 parquet_schema.column(0).logical_type_ref(),
2332 Some(&LogicalType::Uuid)
2333 );
2334
2335 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2336 assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2337
2338 Ok(())
2339 }
2340
2341 #[test]
2342 #[cfg(feature = "arrow_canonical_extension_types")]
2343 fn arrow_json_to_parquet_json() -> Result<()> {
2344 use arrow_schema::extension::Json;
2345 let arrow_schema = Schema::new(vec![
2346 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2347 ]);
2348
2349 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2350
2351 assert_eq!(
2352 parquet_schema.column(0).logical_type_ref(),
2353 Some(&LogicalType::Json)
2354 );
2355
2356 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2357 assert_eq!(
2358 arrow_schema.field(0).try_extension_type::<Json>()?,
2359 Json::default()
2360 );
2361
2362 Ok(())
2363 }
2364
2365 #[test]
2366 fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2367 let message_type = "
2368 message test_schema {
2369 REQUIRED INT32 id;
2370 }
2371 ";
2372 let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2373 let descriptor = SchemaDescriptor::new(parquet_schema);
2374
2375 let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2377 let result = parquet_to_arrow_field_levels_with_virtual(
2378 &descriptor,
2379 ProjectionMask::all(),
2380 None,
2381 &[regular_field],
2382 );
2383
2384 assert!(result.is_err());
2385 assert!(
2386 result
2387 .unwrap_err()
2388 .to_string()
2389 .contains("is not a virtual column")
2390 );
2391 }
2392}