1use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29 ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43 has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44 logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45 try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49pub fn parquet_to_arrow_schema(
54 parquet_schema: &SchemaDescriptor,
55 key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57 parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60pub fn parquet_to_arrow_schema_by_columns(
63 parquet_schema: &SchemaDescriptor,
64 mask: ProjectionMask,
65 key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67 Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70pub(crate) fn parquet_to_arrow_schema_and_fields(
76 parquet_schema: &SchemaDescriptor,
77 mask: ProjectionMask,
78 key_value_metadata: Option<&Vec<KeyValue>>,
79 virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81 let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82 let maybe_schema = metadata
83 .remove(super::ARROW_SCHEMA_META_KEY)
84 .map(|value| get_arrow_schema_from_metadata(&value))
85 .transpose()?;
86
87 if let Some(arrow_schema) = &maybe_schema {
89 arrow_schema.metadata().iter().for_each(|(k, v)| {
90 metadata.entry(k.clone()).or_insert_with(|| v.clone());
91 });
92 }
93
94 let hint = maybe_schema.as_ref().map(|s| s.fields());
95 let field_levels =
96 parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97 let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98 Ok((schema, field_levels.levels))
99}
100
101#[derive(Debug, Clone)]
109pub struct FieldLevels {
110 pub(crate) fields: Fields,
111 pub(crate) levels: Option<ParquetField>,
112}
113
114pub fn parquet_to_arrow_field_levels(
135 schema: &SchemaDescriptor,
136 mask: ProjectionMask,
137 hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139 parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142pub fn parquet_to_arrow_field_levels_with_virtual(
172 schema: &SchemaDescriptor,
173 mask: ProjectionMask,
174 hint: Option<&Fields>,
175 virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177 for field in virtual_columns {
179 if !virtual_type::is_virtual_column(field) {
180 return Err(ParquetError::General(format!(
181 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182 field.name()
183 )));
184 }
185 }
186
187 let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189 Some(field) => field,
190 None if virtual_columns.is_empty() => {
191 return Ok(FieldLevels {
192 fields: Fields::empty(),
193 levels: None,
194 });
195 }
196 None => {
197 ParquetField {
199 rep_level: 0,
200 def_level: 0,
201 nullable: false,
202 arrow_type: DataType::Struct(Fields::empty()),
203 field_type: ParquetFieldType::Group {
204 children: Vec::new(),
205 },
206 }
207 }
208 };
209
210 if !virtual_columns.is_empty() {
212 match &mut parquet_field.field_type {
213 ParquetFieldType::Group { children } => {
214 let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216 unreachable!("Root field must be a struct");
217 };
218
219 let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222 for virtual_column in virtual_columns {
224 assert_eq!(
226 parquet_field.rep_level, 0,
227 "Virtual columns can only be added at rep level 0"
228 );
229 assert_eq!(
230 parquet_field.def_level, 0,
231 "Virtual columns can only be added at def level 0"
232 );
233
234 fields_vec.push(virtual_column.clone());
235 let virtual_parquet_field = complex::convert_virtual_field(
236 virtual_column,
237 parquet_field.rep_level,
238 parquet_field.def_level,
239 )?;
240 children.push(virtual_parquet_field);
241 }
242
243 parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245 }
246 _ => unreachable!("Root field must be a group"),
247 }
248 }
249
250 match &parquet_field.arrow_type {
251 DataType::Struct(fields) => Ok(FieldLevels {
252 fields: fields.clone(),
253 levels: Some(parquet_field),
254 }),
255 _ => unreachable!(),
256 }
257}
258
259fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261 let decoded = BASE64_STANDARD.decode(encoded_meta);
262 match decoded {
263 Ok(bytes) => {
264 let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265 &bytes[8..]
266 } else {
267 bytes.as_slice()
268 };
269 match arrow_ipc::root_as_message(slice) {
270 Ok(message) => message
271 .header_as_schema()
272 .map(arrow_ipc::convert::fb_to_schema)
273 .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274 Err(err) => {
275 Err(arrow_err!(
277 "Unable to get root as message stored in {}: {:?}",
278 super::ARROW_SCHEMA_META_KEY,
279 err
280 ))
281 }
282 }
283 }
284 Err(err) => {
285 Err(arrow_err!(
287 "Unable to decode the encoded schema stored in {}, {:?}",
288 super::ARROW_SCHEMA_META_KEY,
289 err
290 ))
291 }
292 }
293}
294
295pub fn encode_arrow_schema(schema: &Schema) -> String {
297 let options = writer::IpcWriteOptions::default();
298 let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299 let data_gen = writer::IpcDataGenerator::default();
300 let mut serialized_schema =
301 data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303 let schema_len = serialized_schema.ipc_message.len();
306 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309 len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311 BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
321 let encoded = encode_arrow_schema(schema);
322
323 let schema_kv = KeyValue {
324 key: super::ARROW_SCHEMA_META_KEY.to_string(),
325 value: Some(encoded),
326 };
327
328 let meta = props
329 .key_value_metadata
330 .get_or_insert_with(Default::default);
331
332 let schema_meta = meta
334 .iter()
335 .enumerate()
336 .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
337 match schema_meta {
338 Some((i, _)) => {
339 meta.remove(i);
340 meta.push(schema_kv);
341 }
342 None => {
343 meta.push(schema_kv);
344 }
345 }
346}
347
348#[derive(Debug)]
393pub struct ArrowSchemaConverter<'a> {
394 schema_root: &'a str,
396 coerce_types: bool,
400}
401
402impl Default for ArrowSchemaConverter<'_> {
403 fn default() -> Self {
404 Self::new()
405 }
406}
407
408impl<'a> ArrowSchemaConverter<'a> {
409 pub fn new() -> Self {
411 Self {
412 schema_root: "arrow_schema",
413 coerce_types: false,
414 }
415 }
416
417 pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
448 self.coerce_types = coerce_types;
449 self
450 }
451
452 pub fn schema_root(mut self, schema_root: &'a str) -> Self {
454 self.schema_root = schema_root;
455 self
456 }
457
458 pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
462 let fields = schema
463 .fields()
464 .iter()
465 .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
466 .collect::<Result<_>>()?;
467 let group = Type::group_type_builder(self.schema_root)
468 .with_fields(fields)
469 .build()?;
470 Ok(SchemaDescriptor::new(Arc::new(group)))
471 }
472}
473
474fn parse_key_value_metadata(
475 key_value_metadata: Option<&Vec<KeyValue>>,
476) -> Option<HashMap<String, String>> {
477 match key_value_metadata {
478 Some(key_values) => {
479 let map: HashMap<String, String> = key_values
480 .iter()
481 .filter_map(|kv| {
482 kv.value
483 .as_ref()
484 .map(|value| (kv.key.clone(), value.clone()))
485 })
486 .collect();
487
488 if map.is_empty() { None } else { Some(map) }
489 }
490 None => None,
491 }
492}
493
494pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
496 let field = complex::convert_type(&parquet_column.self_type_ptr())?;
497 let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
498
499 let parquet_type = parquet_column.self_type();
500 let basic_info = parquet_type.get_basic_info();
501
502 let mut hash_map_size = 0;
503 if basic_info.has_id() {
504 hash_map_size += 1;
505 }
506 if has_extension_type(parquet_type) {
507 hash_map_size += 1;
508 }
509 if hash_map_size == 0 {
510 return Ok(ret);
511 }
512 ret.set_metadata(HashMap::with_capacity(hash_map_size));
513 if basic_info.has_id() {
514 ret.metadata_mut().insert(
515 PARQUET_FIELD_ID_META_KEY.to_string(),
516 basic_info.id().to_string(),
517 );
518 }
519 try_add_extension_type(ret, parquet_column.self_type())
520}
521
522pub fn decimal_length_from_precision(precision: u8) -> usize {
523 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
531}
532
533fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
535 const PARQUET_LIST_ELEMENT_NAME: &str = "element";
536 const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
537 const PARQUET_KEY_FIELD_NAME: &str = "key";
538 const PARQUET_VALUE_FIELD_NAME: &str = "value";
539
540 let name = field.name().as_str();
541 let repetition = if field.is_nullable() {
542 Repetition::OPTIONAL
543 } else {
544 Repetition::REQUIRED
545 };
546 let id = field_id(field);
547 match field.data_type() {
549 DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
550 .with_logical_type(Some(LogicalType::Unknown))
551 .with_repetition(repetition)
552 .with_id(id)
553 .build(),
554 DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
555 .with_repetition(repetition)
556 .with_id(id)
557 .build(),
558 DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
559 .with_logical_type(Some(LogicalType::Integer {
560 bit_width: 8,
561 is_signed: true,
562 }))
563 .with_repetition(repetition)
564 .with_id(id)
565 .build(),
566 DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
567 .with_logical_type(Some(LogicalType::Integer {
568 bit_width: 16,
569 is_signed: true,
570 }))
571 .with_repetition(repetition)
572 .with_id(id)
573 .build(),
574 DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
575 .with_repetition(repetition)
576 .with_id(id)
577 .build(),
578 DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
579 .with_repetition(repetition)
580 .with_id(id)
581 .build(),
582 DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
583 .with_logical_type(Some(LogicalType::Integer {
584 bit_width: 8,
585 is_signed: false,
586 }))
587 .with_repetition(repetition)
588 .with_id(id)
589 .build(),
590 DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
591 .with_logical_type(Some(LogicalType::Integer {
592 bit_width: 16,
593 is_signed: false,
594 }))
595 .with_repetition(repetition)
596 .with_id(id)
597 .build(),
598 DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
599 .with_logical_type(Some(LogicalType::Integer {
600 bit_width: 32,
601 is_signed: false,
602 }))
603 .with_repetition(repetition)
604 .with_id(id)
605 .build(),
606 DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
607 .with_logical_type(Some(LogicalType::Integer {
608 bit_width: 64,
609 is_signed: false,
610 }))
611 .with_repetition(repetition)
612 .with_id(id)
613 .build(),
614 DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615 .with_repetition(repetition)
616 .with_id(id)
617 .with_logical_type(Some(LogicalType::Float16))
618 .with_length(2)
619 .build(),
620 DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
621 .with_repetition(repetition)
622 .with_id(id)
623 .build(),
624 DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
625 .with_repetition(repetition)
626 .with_id(id)
627 .build(),
628 DataType::Timestamp(TimeUnit::Second, _) => {
629 Type::primitive_type_builder(name, PhysicalType::INT64)
631 .with_repetition(repetition)
632 .with_id(id)
633 .build()
634 }
635 DataType::Timestamp(time_unit, tz) => {
636 Type::primitive_type_builder(name, PhysicalType::INT64)
637 .with_logical_type(Some(LogicalType::Timestamp {
638 is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
640 unit: match time_unit {
641 TimeUnit::Second => unreachable!(),
642 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
643 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
644 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
645 },
646 }))
647 .with_repetition(repetition)
648 .with_id(id)
649 .build()
650 }
651 DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
652 .with_logical_type(Some(LogicalType::Date))
653 .with_repetition(repetition)
654 .with_id(id)
655 .build(),
656 DataType::Date64 => {
657 if coerce_types {
658 Type::primitive_type_builder(name, PhysicalType::INT32)
659 .with_logical_type(Some(LogicalType::Date))
660 .with_repetition(repetition)
661 .with_id(id)
662 .build()
663 } else {
664 Type::primitive_type_builder(name, PhysicalType::INT64)
665 .with_repetition(repetition)
666 .with_id(id)
667 .build()
668 }
669 }
670 DataType::Time32(TimeUnit::Second) => {
671 Type::primitive_type_builder(name, PhysicalType::INT32)
673 .with_repetition(repetition)
674 .with_id(id)
675 .build()
676 }
677 DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
678 .with_logical_type(Some(LogicalType::Time {
679 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
680 unit: match unit {
681 TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
682 u => unreachable!("Invalid unit for Time32: {:?}", u),
683 },
684 }))
685 .with_repetition(repetition)
686 .with_id(id)
687 .build(),
688 DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
689 .with_logical_type(Some(LogicalType::Time {
690 is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
691 unit: match unit {
692 TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
693 TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
694 u => unreachable!("Invalid unit for Time64: {:?}", u),
695 },
696 }))
697 .with_repetition(repetition)
698 .with_id(id)
699 .build(),
700 DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
701 .with_repetition(repetition)
702 .with_id(id)
703 .build(),
704 DataType::Interval(_) => {
705 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
706 .with_converted_type(ConvertedType::INTERVAL)
707 .with_repetition(repetition)
708 .with_id(id)
709 .with_length(12)
710 .build()
711 }
712 DataType::Binary | DataType::LargeBinary => {
713 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
714 .with_repetition(repetition)
715 .with_id(id)
716 .with_logical_type(logical_type_for_binary(field))
717 .build()
718 }
719 DataType::FixedSizeBinary(length) => {
720 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
721 .with_repetition(repetition)
722 .with_id(id)
723 .with_length(*length)
724 .with_logical_type(logical_type_for_fixed_size_binary(field))
725 .build()
726 }
727 DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
728 .with_repetition(repetition)
729 .with_id(id)
730 .with_logical_type(logical_type_for_binary_view(field))
731 .build(),
732 DataType::Decimal32(precision, scale)
733 | DataType::Decimal64(precision, scale)
734 | DataType::Decimal128(precision, scale)
735 | DataType::Decimal256(precision, scale) => {
736 let (physical_type, length) = if *precision > 1 && *precision <= 9 {
739 (PhysicalType::INT32, -1)
740 } else if *precision <= 18 {
741 (PhysicalType::INT64, -1)
742 } else {
743 (
744 PhysicalType::FIXED_LEN_BYTE_ARRAY,
745 decimal_length_from_precision(*precision) as i32,
746 )
747 };
748 Type::primitive_type_builder(name, physical_type)
749 .with_repetition(repetition)
750 .with_id(id)
751 .with_length(length)
752 .with_logical_type(Some(LogicalType::Decimal {
753 scale: *scale as i32,
754 precision: *precision as i32,
755 }))
756 .with_precision(*precision as i32)
757 .with_scale(*scale as i32)
758 .build()
759 }
760 DataType::Utf8 | DataType::LargeUtf8 => {
761 Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
762 .with_logical_type(logical_type_for_string(field))
763 .with_repetition(repetition)
764 .with_id(id)
765 .build()
766 }
767 DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
768 .with_logical_type(logical_type_for_string(field))
769 .with_repetition(repetition)
770 .with_id(id)
771 .build(),
772 DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
773 let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
774 let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
776 Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
777 } else {
778 Arc::new(arrow_to_parquet_type(f, coerce_types)?)
779 };
780
781 Type::group_type_builder(name)
782 .with_fields(vec![Arc::new(
783 Type::group_type_builder("list")
784 .with_fields(vec![field_ref])
785 .with_repetition(Repetition::REPEATED)
786 .build()?,
787 )])
788 .with_logical_type(Some(LogicalType::List))
789 .with_repetition(repetition)
790 .with_id(id)
791 .build()
792 }
793 DataType::ListView(_) | DataType::LargeListView(_) => {
794 unimplemented!("ListView/LargeListView not implemented")
795 }
796 DataType::Struct(fields) => {
797 if fields.is_empty() {
798 return Err(arrow_err!("Parquet does not support writing empty structs",));
799 }
800 let fields = fields
802 .iter()
803 .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
804 .collect::<Result<_>>()?;
805 Type::group_type_builder(name)
806 .with_fields(fields)
807 .with_repetition(repetition)
808 .with_id(id)
809 .with_logical_type(logical_type_for_struct(field))
810 .build()
811 }
812 DataType::Map(field, _) => {
813 if let DataType::Struct(struct_fields) = field.data_type() {
814 let map_struct_name = if coerce_types {
816 PARQUET_MAP_STRUCT_NAME
817 } else {
818 field.name()
819 };
820
821 let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
823 if coerce_types && fld.name() != name {
824 let f = fld.as_ref().clone().with_name(name);
825 Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
826 } else {
827 Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
828 }
829 };
830 let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
831 let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
832
833 Type::group_type_builder(name)
834 .with_fields(vec![Arc::new(
835 Type::group_type_builder(map_struct_name)
836 .with_fields(vec![key_field, val_field])
837 .with_repetition(Repetition::REPEATED)
838 .build()?,
839 )])
840 .with_logical_type(Some(LogicalType::Map))
841 .with_repetition(repetition)
842 .with_id(id)
843 .build()
844 } else {
845 Err(arrow_err!(
846 "DataType::Map should contain a struct field child",
847 ))
848 }
849 }
850 DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
851 DataType::Dictionary(_, value) => {
852 let dict_field = field.clone().with_data_type(value.as_ref().clone());
854 arrow_to_parquet_type(&dict_field, coerce_types)
855 }
856 DataType::RunEndEncoded(_, _) => Err(arrow_err!(
857 "Converting RunEndEncodedType to parquet not supported",
858 )),
859 }
860}
861
862fn field_id(field: &Field) -> Option<i32> {
863 let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
864 value.parse().ok() }
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870
871 use std::{collections::HashMap, sync::Arc};
872
873 use crate::arrow::PARQUET_FIELD_ID_META_KEY;
874 use crate::file::metadata::KeyValue;
875 use crate::file::reader::FileReader;
876 use crate::{
877 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
878 schema::{parser::parse_message_type, types::SchemaDescriptor},
879 };
880 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
881
882 #[test]
883 fn test_flat_primitives() {
884 let message_type = "
885 message test_schema {
886 REQUIRED BOOLEAN boolean;
887 REQUIRED INT32 int8 (INT_8);
888 REQUIRED INT32 int16 (INT_16);
889 REQUIRED INT32 uint8 (INTEGER(8,false));
890 REQUIRED INT32 uint16 (INTEGER(16,false));
891 REQUIRED INT32 int32;
892 REQUIRED INT64 int64;
893 OPTIONAL DOUBLE double;
894 OPTIONAL FLOAT float;
895 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
896 OPTIONAL BINARY string (UTF8);
897 OPTIONAL BINARY string_2 (STRING);
898 OPTIONAL BINARY json (JSON);
899 }
900 ";
901 let parquet_group_type = parse_message_type(message_type).unwrap();
902
903 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
904 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
905
906 let arrow_fields = Fields::from(vec![
907 Field::new("boolean", DataType::Boolean, false),
908 Field::new("int8", DataType::Int8, false),
909 Field::new("int16", DataType::Int16, false),
910 Field::new("uint8", DataType::UInt8, false),
911 Field::new("uint16", DataType::UInt16, false),
912 Field::new("int32", DataType::Int32, false),
913 Field::new("int64", DataType::Int64, false),
914 Field::new("double", DataType::Float64, true),
915 Field::new("float", DataType::Float32, true),
916 Field::new("float16", DataType::Float16, true),
917 Field::new("string", DataType::Utf8, true),
918 Field::new("string_2", DataType::Utf8, true),
919 json_field(),
920 ]);
921
922 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
923 }
924
925 fn json_field() -> Field {
928 #[cfg(feature = "arrow_canonical_extension_types")]
929 {
930 Field::new("json", DataType::Utf8, true)
931 .with_extension_type(arrow_schema::extension::Json::default())
932 }
933 #[cfg(not(feature = "arrow_canonical_extension_types"))]
934 {
935 Field::new("json", DataType::Utf8, true)
936 }
937 }
938
939 #[test]
940 fn test_decimal_fields() {
941 let message_type = "
942 message test_schema {
943 REQUIRED INT32 decimal1 (DECIMAL(4,2));
944 REQUIRED INT64 decimal2 (DECIMAL(12,2));
945 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
946 REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
947 REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
948 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
949 REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
950 }
951 ";
952
953 let parquet_group_type = parse_message_type(message_type).unwrap();
954
955 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
956 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
957
958 let arrow_fields = Fields::from(vec![
959 Field::new("decimal1", DataType::Decimal128(4, 2), false),
960 Field::new("decimal2", DataType::Decimal128(12, 2), false),
961 Field::new("decimal3", DataType::Decimal128(30, 2), false),
962 Field::new("decimal4", DataType::Decimal128(33, 2), false),
963 Field::new("decimal5", DataType::Decimal128(38, 2), false),
964 Field::new("decimal6", DataType::Decimal256(39, 2), false),
965 Field::new("decimal7", DataType::Decimal256(39, 2), false),
966 ]);
967 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
968 }
969
970 #[test]
971 fn test_byte_array_fields() {
972 let message_type = "
973 message test_schema {
974 REQUIRED BYTE_ARRAY binary;
975 REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
976 }
977 ";
978
979 let parquet_group_type = parse_message_type(message_type).unwrap();
980
981 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
982 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
983
984 let arrow_fields = Fields::from(vec![
985 Field::new("binary", DataType::Binary, false),
986 Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
987 ]);
988 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
989 }
990
991 #[test]
992 fn test_duplicate_fields() {
993 let message_type = "
994 message test_schema {
995 REQUIRED BOOLEAN boolean;
996 REQUIRED INT32 int8 (INT_8);
997 }
998 ";
999
1000 let parquet_group_type = parse_message_type(message_type).unwrap();
1001
1002 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1003 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1004
1005 let arrow_fields = Fields::from(vec![
1006 Field::new("boolean", DataType::Boolean, false),
1007 Field::new("int8", DataType::Int8, false),
1008 ]);
1009 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1010
1011 let converted_arrow_schema =
1012 parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1013 .unwrap();
1014 assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1015 }
1016
1017 #[test]
1018 fn test_parquet_lists() {
1019 let mut arrow_fields = Vec::new();
1020
1021 let message_type = "
1023 message test_schema {
1024 REQUIRED GROUP my_list (LIST) {
1025 REPEATED GROUP list {
1026 OPTIONAL BINARY element (UTF8);
1027 }
1028 }
1029 OPTIONAL GROUP my_list (LIST) {
1030 REPEATED GROUP list {
1031 REQUIRED BINARY element (UTF8);
1032 }
1033 }
1034 OPTIONAL GROUP array_of_arrays (LIST) {
1035 REPEATED GROUP list {
1036 REQUIRED GROUP element (LIST) {
1037 REPEATED GROUP list {
1038 REQUIRED INT32 element;
1039 }
1040 }
1041 }
1042 }
1043 OPTIONAL GROUP my_list (LIST) {
1044 REPEATED GROUP element {
1045 REQUIRED BINARY str (UTF8);
1046 }
1047 }
1048 OPTIONAL GROUP my_list (LIST) {
1049 REPEATED INT32 element;
1050 }
1051 OPTIONAL GROUP my_list (LIST) {
1052 REPEATED GROUP element {
1053 REQUIRED BINARY str (UTF8);
1054 REQUIRED INT32 num;
1055 }
1056 }
1057 OPTIONAL GROUP my_list (LIST) {
1058 REPEATED GROUP array {
1059 REQUIRED BINARY str (UTF8);
1060 }
1061
1062 }
1063 OPTIONAL GROUP my_list (LIST) {
1064 REPEATED GROUP my_list_tuple {
1065 REQUIRED BINARY str (UTF8);
1066 }
1067 }
1068 REPEATED INT32 name;
1069 }
1070 ";
1071
1072 {
1079 arrow_fields.push(Field::new_list(
1080 "my_list",
1081 Field::new("element", DataType::Utf8, true),
1082 false,
1083 ));
1084 }
1085
1086 {
1093 arrow_fields.push(Field::new_list(
1094 "my_list",
1095 Field::new("element", DataType::Utf8, false),
1096 true,
1097 ));
1098 }
1099
1100 {
1113 let arrow_inner_list = Field::new("element", DataType::Int32, false);
1114 arrow_fields.push(Field::new_list(
1115 "array_of_arrays",
1116 Field::new_list("element", arrow_inner_list, false),
1117 true,
1118 ));
1119 }
1120
1121 {
1128 arrow_fields.push(Field::new_list(
1129 "my_list",
1130 Field::new("str", DataType::Utf8, false),
1131 true,
1132 ));
1133 }
1134
1135 {
1140 arrow_fields.push(Field::new_list(
1141 "my_list",
1142 Field::new("element", DataType::Int32, false),
1143 true,
1144 ));
1145 }
1146
1147 {
1155 let fields = vec![
1156 Field::new("str", DataType::Utf8, false),
1157 Field::new("num", DataType::Int32, false),
1158 ];
1159 arrow_fields.push(Field::new_list(
1160 "my_list",
1161 Field::new_struct("element", fields, false),
1162 true,
1163 ));
1164 }
1165
1166 {
1174 let fields = vec![Field::new("str", DataType::Utf8, false)];
1175 arrow_fields.push(Field::new_list(
1176 "my_list",
1177 Field::new_struct("array", fields, false),
1178 true,
1179 ));
1180 }
1181
1182 {
1190 let fields = vec![Field::new("str", DataType::Utf8, false)];
1191 arrow_fields.push(Field::new_list(
1192 "my_list",
1193 Field::new_struct("my_list_tuple", fields, false),
1194 true,
1195 ));
1196 }
1197
1198 {
1201 arrow_fields.push(Field::new_list(
1202 "name",
1203 Field::new("name", DataType::Int32, false),
1204 false,
1205 ));
1206 }
1207
1208 let parquet_group_type = parse_message_type(message_type).unwrap();
1209
1210 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1211 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1212 let converted_fields = converted_arrow_schema.fields();
1213
1214 assert_eq!(arrow_fields.len(), converted_fields.len());
1215 for i in 0..arrow_fields.len() {
1216 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1217 }
1218 }
1219
1220 #[test]
1221 fn test_parquet_list_nullable() {
1222 let mut arrow_fields = Vec::new();
1223
1224 let message_type = "
1225 message test_schema {
1226 REQUIRED GROUP my_list1 (LIST) {
1227 REPEATED GROUP list {
1228 OPTIONAL BINARY element (UTF8);
1229 }
1230 }
1231 OPTIONAL GROUP my_list2 (LIST) {
1232 REPEATED GROUP list {
1233 REQUIRED BINARY element (UTF8);
1234 }
1235 }
1236 REQUIRED GROUP my_list3 (LIST) {
1237 REPEATED GROUP list {
1238 REQUIRED BINARY element (UTF8);
1239 }
1240 }
1241 }
1242 ";
1243
1244 {
1251 arrow_fields.push(Field::new_list(
1252 "my_list1",
1253 Field::new("element", DataType::Utf8, true),
1254 false,
1255 ));
1256 }
1257
1258 {
1265 arrow_fields.push(Field::new_list(
1266 "my_list2",
1267 Field::new("element", DataType::Utf8, false),
1268 true,
1269 ));
1270 }
1271
1272 {
1279 arrow_fields.push(Field::new_list(
1280 "my_list3",
1281 Field::new("element", DataType::Utf8, false),
1282 false,
1283 ));
1284 }
1285
1286 let parquet_group_type = parse_message_type(message_type).unwrap();
1287
1288 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1289 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1290 let converted_fields = converted_arrow_schema.fields();
1291
1292 assert_eq!(arrow_fields.len(), converted_fields.len());
1293 for i in 0..arrow_fields.len() {
1294 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1295 }
1296 }
1297
1298 #[test]
1299 fn test_parquet_maps() {
1300 let mut arrow_fields = Vec::new();
1301
1302 let message_type = "
1304 message test_schema {
1305 REQUIRED group my_map1 (MAP) {
1306 REPEATED group key_value {
1307 REQUIRED binary key (UTF8);
1308 OPTIONAL int32 value;
1309 }
1310 }
1311 OPTIONAL group my_map2 (MAP) {
1312 REPEATED group map {
1313 REQUIRED binary str (UTF8);
1314 REQUIRED int32 num;
1315 }
1316 }
1317 OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1318 REPEATED group map {
1319 REQUIRED binary key (UTF8);
1320 OPTIONAL int32 value;
1321 }
1322 }
1323 REQUIRED group my_map4 (MAP) {
1324 REPEATED group map {
1325 OPTIONAL binary key (UTF8);
1326 REQUIRED int32 value;
1327 }
1328 }
1329 }
1330 ";
1331
1332 {
1340 arrow_fields.push(Field::new_map(
1341 "my_map1",
1342 "key_value",
1343 Field::new("key", DataType::Utf8, false),
1344 Field::new("value", DataType::Int32, true),
1345 false,
1346 false,
1347 ));
1348 }
1349
1350 {
1358 arrow_fields.push(Field::new_map(
1359 "my_map2",
1360 "map",
1361 Field::new("str", DataType::Utf8, false),
1362 Field::new("num", DataType::Int32, false),
1363 false,
1364 true,
1365 ));
1366 }
1367
1368 {
1376 arrow_fields.push(Field::new_map(
1377 "my_map3",
1378 "map",
1379 Field::new("key", DataType::Utf8, false),
1380 Field::new("value", DataType::Int32, true),
1381 false,
1382 true,
1383 ));
1384 }
1385
1386 {
1394 arrow_fields.push(Field::new_map(
1395 "my_map4",
1396 "map",
1397 Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, false),
1399 false,
1400 false,
1401 ));
1402 }
1403
1404 let parquet_group_type = parse_message_type(message_type).unwrap();
1405
1406 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1407 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1408 let converted_fields = converted_arrow_schema.fields();
1409
1410 assert_eq!(arrow_fields.len(), converted_fields.len());
1411 for i in 0..arrow_fields.len() {
1412 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1413 }
1414 }
1415
1416 #[test]
1417 fn test_nested_schema() {
1418 let mut arrow_fields = Vec::new();
1419 {
1420 let group1_fields = Fields::from(vec![
1421 Field::new("leaf1", DataType::Boolean, false),
1422 Field::new("leaf2", DataType::Int32, false),
1423 ]);
1424 let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1425 arrow_fields.push(group1_struct);
1426
1427 let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1428 arrow_fields.push(leaf3_field);
1429 }
1430
1431 let message_type = "
1432 message test_schema {
1433 REQUIRED GROUP group1 {
1434 REQUIRED BOOLEAN leaf1;
1435 REQUIRED INT32 leaf2;
1436 }
1437 REQUIRED INT64 leaf3;
1438 }
1439 ";
1440 let parquet_group_type = parse_message_type(message_type).unwrap();
1441
1442 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1443 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1444 let converted_fields = converted_arrow_schema.fields();
1445
1446 assert_eq!(arrow_fields.len(), converted_fields.len());
1447 for i in 0..arrow_fields.len() {
1448 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1449 }
1450 }
1451
1452 #[test]
1453 fn test_nested_schema_partial() {
1454 let mut arrow_fields = Vec::new();
1455 {
1456 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1457 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1458 arrow_fields.push(group1);
1459
1460 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1461 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1462 arrow_fields.push(group2);
1463
1464 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1465 }
1466
1467 let message_type = "
1468 message test_schema {
1469 REQUIRED GROUP group1 {
1470 REQUIRED INT64 leaf1;
1471 REQUIRED INT64 leaf2;
1472 }
1473 REQUIRED GROUP group2 {
1474 REQUIRED INT64 leaf3;
1475 REQUIRED INT64 leaf4;
1476 }
1477 REQUIRED INT64 leaf5;
1478 }
1479 ";
1480 let parquet_group_type = parse_message_type(message_type).unwrap();
1481
1482 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1492 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1493 let converted_arrow_schema =
1494 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1495 let converted_fields = converted_arrow_schema.fields();
1496
1497 assert_eq!(arrow_fields.len(), converted_fields.len());
1498 for i in 0..arrow_fields.len() {
1499 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1500 }
1501 }
1502
1503 #[test]
1504 fn test_nested_schema_partial_ordering() {
1505 let mut arrow_fields = Vec::new();
1506 {
1507 let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1508 let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1509 arrow_fields.push(group1);
1510
1511 let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1512 let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1513 arrow_fields.push(group2);
1514
1515 arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1516 }
1517
1518 let message_type = "
1519 message test_schema {
1520 REQUIRED GROUP group1 {
1521 REQUIRED INT64 leaf1;
1522 REQUIRED INT64 leaf2;
1523 }
1524 REQUIRED GROUP group2 {
1525 REQUIRED INT64 leaf3;
1526 REQUIRED INT64 leaf4;
1527 }
1528 REQUIRED INT64 leaf5;
1529 }
1530 ";
1531 let parquet_group_type = parse_message_type(message_type).unwrap();
1532
1533 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1543 let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1544 let converted_arrow_schema =
1545 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1546 let converted_fields = converted_arrow_schema.fields();
1547
1548 assert_eq!(arrow_fields.len(), converted_fields.len());
1549 for i in 0..arrow_fields.len() {
1550 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1551 }
1552
1553 let mask =
1554 ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1555 let converted_arrow_schema =
1556 parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1557 let converted_fields = converted_arrow_schema.fields();
1558
1559 assert_eq!(arrow_fields.len(), converted_fields.len());
1560 for i in 0..arrow_fields.len() {
1561 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1562 }
1563 }
1564
1565 #[test]
1566 fn test_repeated_nested_schema() {
1567 let mut arrow_fields = Vec::new();
1568 {
1569 arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1570
1571 let inner_group_list = Field::new_list(
1572 "innerGroup",
1573 Field::new_struct(
1574 "innerGroup",
1575 vec![Field::new("leaf3", DataType::Int32, true)],
1576 false,
1577 ),
1578 false,
1579 );
1580
1581 let outer_group_list = Field::new_list(
1582 "outerGroup",
1583 Field::new_struct(
1584 "outerGroup",
1585 vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1586 false,
1587 ),
1588 false,
1589 );
1590 arrow_fields.push(outer_group_list);
1591 }
1592
1593 let message_type = "
1594 message test_schema {
1595 OPTIONAL INT32 leaf1;
1596 REPEATED GROUP outerGroup {
1597 OPTIONAL INT32 leaf2;
1598 REPEATED GROUP innerGroup {
1599 OPTIONAL INT32 leaf3;
1600 }
1601 }
1602 }
1603 ";
1604 let parquet_group_type = parse_message_type(message_type).unwrap();
1605
1606 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1607 let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1608 let converted_fields = converted_arrow_schema.fields();
1609
1610 assert_eq!(arrow_fields.len(), converted_fields.len());
1611 for i in 0..arrow_fields.len() {
1612 assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1613 }
1614 }
1615
1616 #[test]
1617 fn test_column_desc_to_field() {
1618 let message_type = "
1619 message test_schema {
1620 REQUIRED BOOLEAN boolean;
1621 REQUIRED INT32 int8 (INT_8);
1622 REQUIRED INT32 uint8 (INTEGER(8,false));
1623 REQUIRED INT32 int16 (INT_16);
1624 REQUIRED INT32 uint16 (INTEGER(16,false));
1625 REQUIRED INT32 int32;
1626 REQUIRED INT64 int64;
1627 OPTIONAL DOUBLE double;
1628 OPTIONAL FLOAT float;
1629 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1630 OPTIONAL BINARY string (UTF8);
1631 REPEATED BOOLEAN bools;
1632 OPTIONAL INT32 date (DATE);
1633 OPTIONAL INT32 time_milli (TIME_MILLIS);
1634 OPTIONAL INT64 time_micro (TIME_MICROS);
1635 OPTIONAL INT64 time_nano (TIME(NANOS,false));
1636 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1637 REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1638 REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1639 REPEATED INT32 int_list;
1640 REPEATED BINARY byte_list;
1641 REPEATED BINARY string_list (UTF8);
1642 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1643 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1644 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1645 }
1646 ";
1647 let parquet_group_type = parse_message_type(message_type).unwrap();
1648
1649 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1650 let converted_arrow_fields = parquet_schema
1651 .columns()
1652 .iter()
1653 .map(|c| parquet_to_arrow_field(c).unwrap())
1654 .collect::<Vec<Field>>();
1655
1656 let arrow_fields = vec![
1657 Field::new("boolean", DataType::Boolean, false),
1658 Field::new("int8", DataType::Int8, false),
1659 Field::new("uint8", DataType::UInt8, false),
1660 Field::new("int16", DataType::Int16, false),
1661 Field::new("uint16", DataType::UInt16, false),
1662 Field::new("int32", DataType::Int32, false),
1663 Field::new("int64", DataType::Int64, false),
1664 Field::new("double", DataType::Float64, true),
1665 Field::new("float", DataType::Float32, true),
1666 Field::new("float16", DataType::Float16, true),
1667 Field::new("string", DataType::Utf8, true),
1668 Field::new_list(
1669 "bools",
1670 Field::new("bools", DataType::Boolean, false),
1671 false,
1672 ),
1673 Field::new("date", DataType::Date32, true),
1674 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1675 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1676 Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1677 Field::new(
1678 "ts_milli",
1679 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1680 true,
1681 ),
1682 Field::new(
1683 "ts_micro",
1684 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1685 false,
1686 ),
1687 Field::new(
1688 "ts_nano",
1689 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1690 false,
1691 ),
1692 Field::new_list(
1693 "int_list",
1694 Field::new("int_list", DataType::Int32, false),
1695 false,
1696 ),
1697 Field::new_list(
1698 "byte_list",
1699 Field::new("byte_list", DataType::Binary, false),
1700 false,
1701 ),
1702 Field::new_list(
1703 "string_list",
1704 Field::new("string_list", DataType::Utf8, false),
1705 false,
1706 ),
1707 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1708 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1709 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1710 ];
1711
1712 assert_eq!(arrow_fields, converted_arrow_fields);
1713 }
1714
1715 #[test]
1716 fn test_coerced_map_list() {
1717 let arrow_fields = vec![
1719 Field::new_list(
1720 "my_list",
1721 Field::new("item", DataType::Boolean, true),
1722 false,
1723 ),
1724 Field::new_map(
1725 "my_map",
1726 "entries",
1727 Field::new("keys", DataType::Utf8, false),
1728 Field::new("values", DataType::Int32, true),
1729 false,
1730 true,
1731 ),
1732 ];
1733 let arrow_schema = Schema::new(arrow_fields);
1734
1735 let message_type = "
1737 message parquet_schema {
1738 REQUIRED GROUP my_list (LIST) {
1739 REPEATED GROUP list {
1740 OPTIONAL BOOLEAN element;
1741 }
1742 }
1743 OPTIONAL GROUP my_map (MAP) {
1744 REPEATED GROUP key_value {
1745 REQUIRED BINARY key (STRING);
1746 OPTIONAL INT32 value;
1747 }
1748 }
1749 }
1750 ";
1751 let parquet_group_type = parse_message_type(message_type).unwrap();
1752 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1753 let converted_arrow_schema = ArrowSchemaConverter::new()
1754 .with_coerce_types(true)
1755 .convert(&arrow_schema)
1756 .unwrap();
1757 assert_eq!(
1758 parquet_schema.columns().len(),
1759 converted_arrow_schema.columns().len()
1760 );
1761
1762 let message_type = "
1764 message parquet_schema {
1765 REQUIRED GROUP my_list (LIST) {
1766 REPEATED GROUP list {
1767 OPTIONAL BOOLEAN item;
1768 }
1769 }
1770 OPTIONAL GROUP my_map (MAP) {
1771 REPEATED GROUP entries {
1772 REQUIRED BINARY keys (STRING);
1773 OPTIONAL INT32 values;
1774 }
1775 }
1776 }
1777 ";
1778 let parquet_group_type = parse_message_type(message_type).unwrap();
1779 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1780 let converted_arrow_schema = ArrowSchemaConverter::new()
1781 .with_coerce_types(false)
1782 .convert(&arrow_schema)
1783 .unwrap();
1784 assert_eq!(
1785 parquet_schema.columns().len(),
1786 converted_arrow_schema.columns().len()
1787 );
1788 }
1789
1790 #[test]
1791 fn test_field_to_column_desc() {
1792 let message_type = "
1793 message arrow_schema {
1794 REQUIRED BOOLEAN boolean;
1795 REQUIRED INT32 int8 (INT_8);
1796 REQUIRED INT32 int16 (INTEGER(16,true));
1797 REQUIRED INT32 int32;
1798 REQUIRED INT64 int64;
1799 OPTIONAL DOUBLE double;
1800 OPTIONAL FLOAT float;
1801 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1802 OPTIONAL BINARY string (STRING);
1803 OPTIONAL GROUP bools (LIST) {
1804 REPEATED GROUP list {
1805 OPTIONAL BOOLEAN element;
1806 }
1807 }
1808 REQUIRED GROUP bools_non_null (LIST) {
1809 REPEATED GROUP list {
1810 REQUIRED BOOLEAN element;
1811 }
1812 }
1813 OPTIONAL INT32 date (DATE);
1814 OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1815 OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
1816 OPTIONAL INT64 time_micro (TIME_MICROS);
1817 OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
1818 OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1819 REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1820 REQUIRED INT64 ts_seconds;
1821 REQUIRED INT64 ts_micro_utc (TIMESTAMP(MICROS, true));
1822 REQUIRED INT64 ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1823 REQUIRED INT64 ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1824 REQUIRED INT64 ts_micro_non_utc (TIMESTAMP(MICROS, true));
1825 REQUIRED GROUP struct {
1826 REQUIRED BOOLEAN bools;
1827 REQUIRED INT32 uint32 (INTEGER(32,false));
1828 REQUIRED GROUP int32 (LIST) {
1829 REPEATED GROUP list {
1830 OPTIONAL INT32 element;
1831 }
1832 }
1833 }
1834 REQUIRED BINARY dictionary_strings (STRING);
1835 REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1836 REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1837 REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1838 REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1839 REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1840 }
1841 ";
1842 let parquet_group_type = parse_message_type(message_type).unwrap();
1843
1844 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1845
1846 let arrow_fields = vec![
1847 Field::new("boolean", DataType::Boolean, false),
1848 Field::new("int8", DataType::Int8, false),
1849 Field::new("int16", DataType::Int16, false),
1850 Field::new("int32", DataType::Int32, false),
1851 Field::new("int64", DataType::Int64, false),
1852 Field::new("double", DataType::Float64, true),
1853 Field::new("float", DataType::Float32, true),
1854 Field::new("float16", DataType::Float16, true),
1855 Field::new("string", DataType::Utf8, true),
1856 Field::new_list(
1857 "bools",
1858 Field::new("element", DataType::Boolean, true),
1859 true,
1860 ),
1861 Field::new_list(
1862 "bools_non_null",
1863 Field::new("element", DataType::Boolean, false),
1864 false,
1865 ),
1866 Field::new("date", DataType::Date32, true),
1867 Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1868 Field::new(
1869 "time_milli_utc",
1870 DataType::Time32(TimeUnit::Millisecond),
1871 true,
1872 )
1873 .with_metadata(HashMap::from_iter(vec![(
1874 "adjusted_to_utc".to_string(),
1875 "".to_string(),
1876 )])),
1877 Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1878 Field::new(
1879 "time_micro_utc",
1880 DataType::Time64(TimeUnit::Microsecond),
1881 true,
1882 )
1883 .with_metadata(HashMap::from_iter(vec![(
1884 "adjusted_to_utc".to_string(),
1885 "".to_string(),
1886 )])),
1887 Field::new(
1888 "ts_milli",
1889 DataType::Timestamp(TimeUnit::Millisecond, None),
1890 true,
1891 ),
1892 Field::new(
1893 "ts_micro",
1894 DataType::Timestamp(TimeUnit::Microsecond, None),
1895 false,
1896 ),
1897 Field::new(
1898 "ts_seconds",
1899 DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1900 false,
1901 ),
1902 Field::new(
1903 "ts_micro_utc",
1904 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1905 false,
1906 ),
1907 Field::new(
1908 "ts_millis_zero_offset",
1909 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1910 false,
1911 ),
1912 Field::new(
1913 "ts_millis_zero_negative_offset",
1914 DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1915 false,
1916 ),
1917 Field::new(
1918 "ts_micro_non_utc",
1919 DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1920 false,
1921 ),
1922 Field::new_struct(
1923 "struct",
1924 vec![
1925 Field::new("bools", DataType::Boolean, false),
1926 Field::new("uint32", DataType::UInt32, false),
1927 Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1928 ],
1929 false,
1930 ),
1931 Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1932 Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1933 Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1934 Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1935 Field::new("decimal128", DataType::Decimal128(38, 2), false),
1936 Field::new("decimal256", DataType::Decimal256(39, 2), false),
1937 ];
1938 let arrow_schema = Schema::new(arrow_fields);
1939 let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1940
1941 assert_eq!(
1942 parquet_schema.columns().len(),
1943 converted_arrow_schema.columns().len()
1944 );
1945 parquet_schema
1946 .columns()
1947 .iter()
1948 .zip(converted_arrow_schema.columns())
1949 .for_each(|(a, b)| {
1950 match a.logical_type_ref() {
1955 Some(_) => {
1956 assert_eq!(a, b)
1957 }
1958 None => {
1959 assert_eq!(a.name(), b.name());
1960 assert_eq!(a.physical_type(), b.physical_type());
1961 assert_eq!(a.converted_type(), b.converted_type());
1962 }
1963 };
1964 });
1965 }
1966
1967 #[test]
1968 #[should_panic(expected = "Parquet does not support writing empty structs")]
1969 fn test_empty_struct_field() {
1970 let arrow_fields = vec![Field::new(
1971 "struct",
1972 DataType::Struct(Fields::empty()),
1973 false,
1974 )];
1975 let arrow_schema = Schema::new(arrow_fields);
1976 let converted_arrow_schema = ArrowSchemaConverter::new()
1977 .with_coerce_types(true)
1978 .convert(&arrow_schema);
1979
1980 converted_arrow_schema.unwrap();
1981 }
1982
1983 #[test]
1984 fn test_metadata() {
1985 let message_type = "
1986 message test_schema {
1987 OPTIONAL BINARY string (STRING);
1988 }
1989 ";
1990 let parquet_group_type = parse_message_type(message_type).unwrap();
1991
1992 let key_value_metadata = vec![
1993 KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1994 KeyValue::new("baz".to_owned(), None),
1995 ];
1996
1997 let mut expected_metadata: HashMap<String, String> = HashMap::new();
1998 expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1999
2000 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2001 let converted_arrow_schema =
2002 parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2003
2004 assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2005 }
2006
2007 #[test]
2008 fn test_arrow_schema_roundtrip() -> Result<()> {
2009 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2010 a.iter()
2011 .map(|(a, b)| (a.to_string(), b.to_string()))
2012 .collect()
2013 };
2014
2015 let schema = Schema::new_with_metadata(
2016 vec![
2017 Field::new("c1", DataType::Utf8, false)
2018 .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2019 Field::new("c2", DataType::Binary, false),
2020 Field::new("c3", DataType::FixedSizeBinary(3), false),
2021 Field::new("c4", DataType::Boolean, false),
2022 Field::new("c5", DataType::Date32, false),
2023 Field::new("c6", DataType::Date64, false),
2024 Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2025 Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2026 Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2027 Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2028 Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2029 Field::new(
2030 "c16",
2031 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2032 false,
2033 ),
2034 Field::new(
2035 "c17",
2036 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2037 false,
2038 ),
2039 Field::new(
2040 "c18",
2041 DataType::Timestamp(TimeUnit::Nanosecond, None),
2042 false,
2043 ),
2044 Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2045 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2046 Field::new_list(
2047 "c21",
2048 Field::new_list_field(DataType::Boolean, true)
2049 .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2050 false,
2051 )
2052 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2053 Field::new(
2054 "c22",
2055 DataType::FixedSizeList(
2056 Arc::new(Field::new_list_field(DataType::Boolean, true)),
2057 5,
2058 ),
2059 false,
2060 ),
2061 Field::new_list(
2062 "c23",
2063 Field::new_large_list(
2064 "inner",
2065 Field::new_list_field(
2066 DataType::Struct(
2067 vec![
2068 Field::new("a", DataType::Int16, true),
2069 Field::new("b", DataType::Float64, false),
2070 Field::new("c", DataType::Float32, false),
2071 Field::new("d", DataType::Float16, false),
2072 ]
2073 .into(),
2074 ),
2075 false,
2076 ),
2077 true,
2078 ),
2079 false,
2080 ),
2081 Field::new(
2082 "c24",
2083 DataType::Struct(Fields::from(vec![
2084 Field::new("a", DataType::Utf8, false),
2085 Field::new("b", DataType::UInt16, false),
2086 ])),
2087 false,
2088 ),
2089 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2090 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2091 #[allow(deprecated)]
2097 Field::new_dict(
2098 "c31",
2099 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2100 true,
2101 123,
2102 true,
2103 )
2104 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2105 Field::new("c32", DataType::LargeBinary, true),
2106 Field::new("c33", DataType::LargeUtf8, true),
2107 Field::new_large_list(
2108 "c34",
2109 Field::new_list(
2110 "inner",
2111 Field::new_list_field(
2112 DataType::Struct(
2113 vec![
2114 Field::new("a", DataType::Int16, true),
2115 Field::new("b", DataType::Float64, true),
2116 ]
2117 .into(),
2118 ),
2119 true,
2120 ),
2121 true,
2122 ),
2123 true,
2124 ),
2125 Field::new("c35", DataType::Null, true),
2126 Field::new("c36", DataType::Decimal128(2, 1), false),
2127 Field::new("c37", DataType::Decimal256(50, 20), false),
2128 Field::new("c38", DataType::Decimal128(18, 12), true),
2129 Field::new_map(
2130 "c39",
2131 "key_value",
2132 Field::new("key", DataType::Utf8, false),
2133 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2134 false, true,
2136 ),
2137 Field::new_map(
2138 "c40",
2139 "my_entries",
2140 Field::new("my_key", DataType::Utf8, false)
2141 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2142 Field::new_list(
2143 "my_value",
2144 Field::new_list_field(DataType::Utf8, true)
2145 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2146 true,
2147 )
2148 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2149 false, true,
2151 )
2152 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2153 Field::new_map(
2154 "c41",
2155 "my_entries",
2156 Field::new("my_key", DataType::Utf8, false),
2157 Field::new_list(
2158 "my_value",
2159 Field::new_list_field(DataType::Utf8, true)
2160 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2161 true,
2162 ),
2163 false, false,
2165 ),
2166 Field::new("c42", DataType::Decimal32(5, 2), false),
2167 Field::new("c43", DataType::Decimal64(18, 12), true),
2168 ],
2169 meta(&[("Key", "Value")]),
2170 );
2171
2172 let file = tempfile::tempfile().unwrap();
2174 let writer =
2175 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2176 writer.close()?;
2177
2178 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2180
2181 let read_schema = arrow_reader.schema();
2183 assert_eq!(&schema, read_schema.as_ref());
2184
2185 let mut stack = Vec::with_capacity(10);
2187 let mut out = Vec::with_capacity(10);
2188
2189 let root = arrow_reader.parquet_schema().root_schema_ptr();
2190 stack.push((root.name().to_string(), root));
2191
2192 while let Some((p, t)) = stack.pop() {
2193 if t.is_group() {
2194 for f in t.get_fields() {
2195 stack.push((format!("{p}.{}", f.name()), f.clone()))
2196 }
2197 }
2198
2199 let info = t.get_basic_info();
2200 if info.has_id() {
2201 out.push(format!("{p} -> {}", info.id()))
2202 }
2203 }
2204 out.sort_unstable();
2205 let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2206
2207 assert_eq!(
2208 &out,
2209 &[
2210 "arrow_schema.c1 -> 2",
2211 "arrow_schema.c21 -> 4",
2212 "arrow_schema.c21.list.item -> 5",
2213 "arrow_schema.c31 -> 6",
2214 "arrow_schema.c40 -> 7",
2215 "arrow_schema.c40.my_entries.my_key -> 8",
2216 "arrow_schema.c40.my_entries.my_value -> 9",
2217 "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2218 "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2219 ]
2220 );
2221
2222 Ok(())
2223 }
2224
2225 #[test]
2226 fn test_read_parquet_field_ids_raw() -> Result<()> {
2227 let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2228 a.iter()
2229 .map(|(a, b)| (a.to_string(), b.to_string()))
2230 .collect()
2231 };
2232 let schema = Schema::new_with_metadata(
2233 vec![
2234 Field::new("c1", DataType::Utf8, true)
2235 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2236 Field::new("c2", DataType::Utf8, true)
2237 .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2238 ],
2239 HashMap::new(),
2240 );
2241
2242 let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2243 let parquet_bytes = writer.into_inner()?;
2244
2245 let reader =
2246 crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2247 let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2248
2249 let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2251
2252 let parq_schema_descr = ArrowSchemaConverter::new()
2253 .with_coerce_types(true)
2254 .convert(&arrow_schema)?;
2255 let parq_fields = parq_schema_descr.root_schema().get_fields();
2256 assert_eq!(parq_fields.len(), 2);
2257 assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2258 assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2259
2260 Ok(())
2261 }
2262
2263 #[test]
2264 fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2265 let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2266 .iter()
2267 .cloned()
2268 .collect();
2269
2270 let schema = Schema::new_with_metadata(
2271 vec![
2272 Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2273 Field::new(
2274 "c22",
2275 DataType::FixedSizeList(
2276 Arc::new(Field::new("items", DataType::Boolean, false)),
2277 5,
2278 ),
2279 false,
2280 ),
2281 Field::new_list(
2282 "c23",
2283 Field::new_large_list(
2284 "items",
2285 Field::new_struct(
2286 "items",
2287 vec![
2288 Field::new("a", DataType::Int16, true),
2289 Field::new("b", DataType::Float64, false),
2290 ],
2291 true,
2292 ),
2293 true,
2294 ),
2295 true,
2296 ),
2297 ],
2298 metadata,
2299 );
2300
2301 let file = tempfile::tempfile().unwrap();
2303 let writer =
2304 ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2305 writer.close()?;
2306
2307 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2309 let read_schema = arrow_reader.schema();
2310 assert_eq!(&schema, read_schema.as_ref());
2311 Ok(())
2312 }
2313
2314 #[test]
2315 fn test_get_arrow_schema_from_metadata() {
2316 assert!(get_arrow_schema_from_metadata("").is_err());
2317 }
2318
2319 #[test]
2320 #[cfg(feature = "arrow_canonical_extension_types")]
2321 fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2322 use arrow_schema::extension::Uuid;
2323 let arrow_schema = Schema::new(vec![
2324 Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2325 ]);
2326
2327 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2328
2329 assert_eq!(
2330 parquet_schema.column(0).logical_type_ref(),
2331 Some(&LogicalType::Uuid)
2332 );
2333
2334 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2335 assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2336
2337 Ok(())
2338 }
2339
2340 #[test]
2341 #[cfg(feature = "arrow_canonical_extension_types")]
2342 fn arrow_json_to_parquet_json() -> Result<()> {
2343 use arrow_schema::extension::Json;
2344 let arrow_schema = Schema::new(vec![
2345 Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2346 ]);
2347
2348 let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2349
2350 assert_eq!(
2351 parquet_schema.column(0).logical_type_ref(),
2352 Some(&LogicalType::Json)
2353 );
2354
2355 let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2356 assert_eq!(
2357 arrow_schema.field(0).try_extension_type::<Json>()?,
2358 Json::default()
2359 );
2360
2361 Ok(())
2362 }
2363
2364 #[test]
2365 fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2366 let message_type = "
2367 message test_schema {
2368 REQUIRED INT32 id;
2369 }
2370 ";
2371 let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2372 let descriptor = SchemaDescriptor::new(parquet_schema);
2373
2374 let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2376 let result = parquet_to_arrow_field_levels_with_virtual(
2377 &descriptor,
2378 ProjectionMask::all(),
2379 None,
2380 &[regular_field],
2381 );
2382
2383 assert!(result.is_err());
2384 assert!(
2385 result
2386 .unwrap_err()
2387 .to_string()
2388 .contains("is not a virtual column")
2389 );
2390 }
2391}