1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32 let info = t.get_basic_info();
33 match info.has_repetition() {
34 true => info.repetition(),
35 false => Repetition::REQUIRED,
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ParquetField {
42 pub rep_level: i16,
45 pub def_level: i16,
49 pub nullable: bool,
51 pub arrow_type: DataType,
56 pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61 fn into_list(self, name: &str) -> Self {
65 ParquetField {
66 rep_level: self.rep_level,
67 def_level: self.def_level,
68 nullable: false,
69 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70 field_type: ParquetFieldType::Group {
71 children: vec![self],
72 },
73 }
74 }
75
76 fn into_list_with_arrow_list_hint(
83 self,
84 parquet_field_type: &Type,
85 list_data_type: Option<DataType>,
86 ) -> Result<Self, ParquetError> {
87 let arrow_field = match &list_data_type {
88 Some(DataType::List(field_hint))
89 | Some(DataType::LargeList(field_hint))
90 | Some(DataType::FixedSizeList(field_hint, _)) => Some(field_hint.as_ref()),
91 Some(_) => {
92 return Err(general_err!(
93 "Internal error: should be validated earlier that list_data_type is only a type of list"
94 ));
95 }
96 None => None,
97 };
98
99 let arrow_field = convert_field(
100 parquet_field_type,
101 &self,
102 arrow_field,
103 false,
105 )?
106 .with_nullable(false);
107
108 Ok(ParquetField {
109 rep_level: self.rep_level,
110 def_level: self.def_level,
111 nullable: false,
112 arrow_type: match list_data_type {
113 Some(DataType::List(_)) => DataType::List(Arc::new(arrow_field)),
114 Some(DataType::LargeList(_)) => DataType::LargeList(Arc::new(arrow_field)),
115 Some(DataType::FixedSizeList(_, len)) => {
116 DataType::FixedSizeList(Arc::new(arrow_field), len)
117 }
118 _ => DataType::List(Arc::new(arrow_field)),
119 },
120 field_type: ParquetFieldType::Group {
121 children: vec![self],
122 },
123 })
124 }
125
126 pub fn children(&self) -> Option<&[Self]> {
128 match &self.field_type {
129 ParquetFieldType::Primitive { .. } => None,
130 ParquetFieldType::Group { children } => Some(children),
131 ParquetFieldType::Virtual(_) => None,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq)]
138pub enum VirtualColumnType {
139 RowNumber,
141 RowGroupIndex,
143}
144
145#[derive(Debug, Clone)]
146pub enum ParquetFieldType {
147 Primitive {
148 col_idx: usize,
150 primitive_type: TypePtr,
152 },
153 Group {
154 children: Vec<ParquetField>,
155 },
156 Virtual(VirtualColumnType),
159}
160
161struct VisitorContext {
163 rep_level: i16,
164 def_level: i16,
165 data_type: Option<DataType>,
167
168 treat_repeated_as_list_arrow_hint: bool,
178}
179
180impl VisitorContext {
181 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
184 match repetition {
185 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
186 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
187 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
188 }
189 }
190}
191
192struct Visitor {
198 next_col_idx: usize,
200
201 mask: ProjectionMask,
203}
204
205impl Visitor {
206 fn visit_primitive(
207 &mut self,
208 primitive_type: &TypePtr,
209 context: VisitorContext,
210 ) -> Result<Option<ParquetField>> {
211 let col_idx = self.next_col_idx;
212 self.next_col_idx += 1;
213
214 if !self.mask.leaf_included(col_idx) {
215 return Ok(None);
216 }
217
218 let repetition = get_repetition(primitive_type);
219 let (def_level, rep_level, nullable) = context.levels(repetition);
220
221 let primitive_arrow_data_type = match repetition {
222 Repetition::REPEATED if context.treat_repeated_as_list_arrow_hint => {
223 let arrow_field = match &context.data_type {
224 Some(DataType::List(f)) => Some(f.as_ref()),
225 Some(DataType::LargeList(f)) => Some(f.as_ref()),
226 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
227 Some(d) => {
228 return Err(arrow_err!(
229 "incompatible arrow schema, expected list got {} for repeated primitive field",
230 d
231 ));
232 }
233 None => None,
234 };
235
236 arrow_field.map(|f| f.data_type().clone())
237 }
238 _ => context.data_type.clone(),
239 };
240
241 let arrow_type = convert_primitive(primitive_type, primitive_arrow_data_type)?;
242
243 let primitive_field = ParquetField {
244 rep_level,
245 def_level,
246 nullable,
247 arrow_type,
248 field_type: ParquetFieldType::Primitive {
249 primitive_type: primitive_type.clone(),
250 col_idx,
251 },
252 };
253
254 Ok(Some(match repetition {
255 Repetition::REPEATED if context.treat_repeated_as_list_arrow_hint => {
256 primitive_field.into_list_with_arrow_list_hint(primitive_type, context.data_type)?
257 }
258 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
259 _ => primitive_field,
260 }))
261 }
262
263 fn visit_struct(
264 &mut self,
265 struct_type: &TypePtr,
266 context: VisitorContext,
267 ) -> Result<Option<ParquetField>> {
268 let repetition = get_repetition(struct_type);
270 let (def_level, rep_level, nullable) = context.levels(repetition);
271
272 let parquet_fields = struct_type.get_fields();
273
274 let arrow_struct = match repetition {
276 Repetition::REPEATED if context.treat_repeated_as_list_arrow_hint => {
277 let arrow_field = match &context.data_type {
278 Some(DataType::List(f)) => Some(f.as_ref()),
279 Some(DataType::LargeList(f)) => Some(f.as_ref()),
280 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
281 Some(d) => {
282 return Err(arrow_err!(
283 "incompatible arrow schema, expected list got {} for repeated struct field",
284 d
285 ));
286 }
287 None => None,
288 };
289
290 arrow_field.map(|f| f.data_type())
291 }
292 _ => context.data_type.as_ref(),
293 };
294
295 let arrow_fields = match &arrow_struct {
296 Some(DataType::Struct(fields)) => {
297 if fields.len() != parquet_fields.len() {
298 return Err(arrow_err!(
299 "incompatible arrow schema, expected {} struct fields got {}",
300 parquet_fields.len(),
301 fields.len()
302 ));
303 }
304 Some(fields)
305 }
306 Some(d) => {
307 return Err(arrow_err!(
308 "incompatible arrow schema, expected struct got {}",
309 d
310 ));
311 }
312 None => None,
313 };
314
315 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
316 let mut children = Vec::with_capacity(parquet_fields.len());
317
318 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
320 let data_type = match arrow_fields {
321 Some(fields) => {
322 let field = &fields[idx];
323 if field.name() != parquet_field.name() {
324 return Err(arrow_err!(
325 "incompatible arrow schema, expected field named {} got {}",
326 parquet_field.name(),
327 field.name()
328 ));
329 }
330 Some(field.data_type().clone())
331 }
332 None => None,
333 };
334
335 let arrow_field = arrow_fields.map(|x| &*x[idx]);
336 let child_ctx = VisitorContext {
337 rep_level,
338 def_level,
339 data_type,
340
341 treat_repeated_as_list_arrow_hint: true,
348 };
349
350 if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
351 child_fields.push(convert_field(parquet_field, &child, arrow_field, true)?);
354 children.push(child);
355 }
356 }
357
358 if children.is_empty() {
359 return Ok(None);
360 }
361
362 let struct_field = ParquetField {
363 rep_level,
364 def_level,
365 nullable,
366 arrow_type: DataType::Struct(child_fields.finish().fields),
367 field_type: ParquetFieldType::Group { children },
368 };
369
370 Ok(Some(match repetition {
371 Repetition::REPEATED if context.treat_repeated_as_list_arrow_hint => {
372 struct_field.into_list_with_arrow_list_hint(struct_type, context.data_type)?
373 }
374 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
375 _ => struct_field,
376 }))
377 }
378
379 fn visit_map(
380 &mut self,
381 map_type: &TypePtr,
382 context: VisitorContext,
383 ) -> Result<Option<ParquetField>> {
384 let rep_level = context.rep_level + 1;
385 let (def_level, nullable) = match get_repetition(map_type) {
386 Repetition::REQUIRED => (context.def_level + 1, false),
387 Repetition::OPTIONAL => (context.def_level + 2, true),
388 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
389 };
390
391 if map_type.get_fields().len() != 1 {
392 return Err(arrow_err!(
393 "Map field must have exactly one key_value child, found {}",
394 map_type.get_fields().len()
395 ));
396 }
397
398 let map_key_value = &map_type.get_fields()[0];
400 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
401 return Err(arrow_err!("Child of map field must be repeated"));
402 }
403
404 if map_key_value.get_fields().len() == 1 {
407 return self.visit_list(map_type, context);
408 }
409
410 if map_key_value.get_fields().len() != 2 {
411 return Err(arrow_err!(
412 "Child of map field must have two children, found {}",
413 map_key_value.get_fields().len()
414 ));
415 }
416
417 let map_key = &map_key_value.get_fields()[0];
419 let map_value = &map_key_value.get_fields()[1];
420
421 match map_key.get_basic_info().repetition() {
422 Repetition::REPEATED => {
423 return Err(arrow_err!("Map keys cannot be repeated"));
424 }
425 Repetition::REQUIRED | Repetition::OPTIONAL => {
426 }
431 }
432
433 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
434 return Err(arrow_err!("Map values cannot be repeated"));
435 }
436
437 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
439 Some(DataType::Map(field, sorted)) => match field.data_type() {
440 DataType::Struct(fields) => {
441 if fields.len() != 2 {
442 return Err(arrow_err!(
443 "Map data type should contain struct with two children, got {}",
444 fields.len()
445 ));
446 }
447
448 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
449 }
450 d => {
451 return Err(arrow_err!("Map data type should contain struct got {}", d));
452 }
453 },
454 Some(d) => {
455 return Err(arrow_err!(
456 "incompatible arrow schema, expected map got {}",
457 d
458 ));
459 }
460 None => (None, None, None, false),
461 };
462
463 let maybe_key = {
464 let context = VisitorContext {
465 rep_level,
466 def_level,
467 data_type: arrow_key.map(|x| x.data_type().clone()),
468 treat_repeated_as_list_arrow_hint: false,
470 };
471
472 self.dispatch(map_key, context)?
473 };
474
475 let maybe_value = {
476 let context = VisitorContext {
477 rep_level,
478 def_level,
479 data_type: arrow_value.map(|x| x.data_type().clone()),
480 treat_repeated_as_list_arrow_hint: true,
482 };
483
484 self.dispatch(map_value, context)?
485 };
486
487 match (maybe_key, maybe_value) {
489 (Some(key), Some(value)) => {
490 let key_field = Arc::new(
491 convert_field(map_key, &key, arrow_key, true)?
492 .with_nullable(false),
494 );
495 let value_field = Arc::new(convert_field(map_value, &value, arrow_value, true)?);
496 let field_metadata = match arrow_map {
497 Some(field) => field.metadata().clone(),
498 _ => HashMap::default(),
499 };
500
501 let map_field = Field::new_struct(
502 map_key_value.name(),
503 [key_field, value_field],
504 false, )
506 .with_metadata(field_metadata);
507
508 Ok(Some(ParquetField {
509 rep_level,
510 def_level,
511 nullable,
512 arrow_type: DataType::Map(Arc::new(map_field), sorted),
513 field_type: ParquetFieldType::Group {
514 children: vec![key, value],
515 },
516 }))
517 }
518 _ => Ok(None),
519 }
520 }
521
522 fn visit_list(
523 &mut self,
524 list_type: &TypePtr,
525 context: VisitorContext,
526 ) -> Result<Option<ParquetField>> {
527 if list_type.is_primitive() {
528 return Err(arrow_err!(
529 "{:?} is a list type and can't be processed as primitive.",
530 list_type
531 ));
532 }
533
534 let fields = list_type.get_fields();
535 if fields.len() != 1 {
536 return Err(arrow_err!(
537 "list type must have a single child, found {}",
538 fields.len()
539 ));
540 }
541
542 let repeated_field = &fields[0];
543 if get_repetition(repeated_field) != Repetition::REPEATED {
544 return Err(arrow_err!("List child must be repeated"));
545 }
546
547 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
549 Repetition::REQUIRED => (context.def_level, false),
550 Repetition::OPTIONAL => (context.def_level + 1, true),
551 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
552 };
553
554 let arrow_field = match &context.data_type {
555 Some(DataType::List(f)) => Some(f.as_ref()),
556 Some(DataType::LargeList(f)) => Some(f.as_ref()),
557 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
558 Some(DataType::ListView(f)) => Some(f.as_ref()),
559 Some(DataType::LargeListView(f)) => Some(f.as_ref()),
560 Some(d) => {
561 return Err(arrow_err!(
562 "incompatible arrow schema, expected list got {}",
563 d
564 ));
565 }
566 None => None,
567 };
568
569 if repeated_field.is_primitive() {
570 let context = VisitorContext {
577 rep_level: context.rep_level,
578 def_level,
579 data_type: arrow_field.map(|f| f.data_type().clone()),
580 treat_repeated_as_list_arrow_hint: false,
581 };
582
583 return match self.visit_primitive(repeated_field, context) {
584 Ok(Some(mut field)) => {
585 field.nullable = nullable;
587 Ok(Some(field))
588 }
589 r => r,
590 };
591 }
592
593 let items = repeated_field.get_fields();
595 if items.len() != 1
596 || (!repeated_field.is_list()
597 && !repeated_field.has_single_repeated_child()
598 && (repeated_field.name() == "array"
599 || repeated_field.name() == format!("{}_tuple", list_type.name())))
600 {
601 let context = VisitorContext {
609 rep_level: context.rep_level,
610 def_level,
611 data_type: arrow_field.map(|f| f.data_type().clone()),
612 treat_repeated_as_list_arrow_hint: false,
613 };
614
615 return match self.visit_struct(repeated_field, context) {
616 Ok(Some(mut field)) => {
617 field.nullable = nullable;
618 Ok(Some(field))
619 }
620 r => r,
621 };
622 }
623
624 let item_type = &items[0];
626 let rep_level = context.rep_level + 1;
627 let def_level = def_level + 1;
628
629 let new_context = VisitorContext {
630 def_level,
631 rep_level,
632 data_type: arrow_field.map(|f| f.data_type().clone()),
633 treat_repeated_as_list_arrow_hint: true,
634 };
635
636 match self.dispatch(item_type, new_context) {
637 Ok(Some(item)) => {
638 let item_field = Arc::new(convert_field(item_type, &item, arrow_field, true)?);
639
640 let arrow_type = match context.data_type {
642 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
643 Some(DataType::FixedSizeList(_, len)) => {
644 DataType::FixedSizeList(item_field, len)
645 }
646 Some(DataType::ListView(_)) => DataType::ListView(item_field),
647 Some(DataType::LargeListView(_)) => DataType::LargeListView(item_field),
648 _ => DataType::List(item_field),
649 };
650
651 Ok(Some(ParquetField {
652 rep_level,
653 def_level,
654 nullable,
655 arrow_type,
656 field_type: ParquetFieldType::Group {
657 children: vec![item],
658 },
659 }))
660 }
661 r => r,
662 }
663 }
664
665 fn dispatch(
666 &mut self,
667 cur_type: &TypePtr,
668 context: VisitorContext,
669 ) -> Result<Option<ParquetField>> {
670 if cur_type.is_primitive() {
671 self.visit_primitive(cur_type, context)
672 } else {
673 match cur_type.get_basic_info().converted_type() {
674 ConvertedType::LIST => self.visit_list(cur_type, context),
675 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
676 self.visit_map(cur_type, context)
677 }
678 _ => self.visit_struct(cur_type, context),
679 }
680 }
681 }
682}
683
684pub(super) fn convert_virtual_field(
694 arrow_field: &Field,
695 parent_rep_level: i16,
696 parent_def_level: i16,
697) -> Result<ParquetField> {
698 let nullable = arrow_field.is_nullable();
699 let def_level = if nullable {
700 parent_def_level + 1
701 } else {
702 parent_def_level
703 };
704
705 let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
707 ParquetError::ArrowError(format!(
708 "virtual column field '{}' must have an extension type",
709 arrow_field.name()
710 ))
711 })?;
712
713 let virtual_type = match extension_name {
714 RowNumber::NAME => VirtualColumnType::RowNumber,
715 RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
716 _ => {
717 return Err(ParquetError::ArrowError(format!(
718 "unsupported virtual column type '{}' for field '{}'",
719 extension_name,
720 arrow_field.name()
721 )));
722 }
723 };
724
725 Ok(ParquetField {
726 rep_level: parent_rep_level,
727 def_level,
728 nullable,
729 arrow_type: arrow_field.data_type().clone(),
730 field_type: ParquetFieldType::Virtual(virtual_type),
731 })
732}
733
734fn convert_field(
739 parquet_type: &Type,
740 field: &ParquetField,
741 arrow_hint: Option<&Field>,
742 add_field_id: bool,
743) -> Result<Field, ParquetError> {
744 let name = parquet_type.name();
745 let data_type = field.arrow_type.clone();
746 let nullable = field.nullable;
747
748 match arrow_hint {
749 Some(hint) => {
750 #[allow(deprecated)]
752 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
753 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
754 {
755 #[allow(deprecated)]
756 Field::new_dict(name, data_type, nullable, id, ordered)
757 }
758 _ => Field::new(name, data_type, nullable),
759 };
760
761 Ok(field.with_metadata(hint.metadata().clone()))
762 }
763 None => {
764 let mut ret = Field::new(name, data_type, nullable);
765 let basic_info = parquet_type.get_basic_info();
766 if add_field_id && basic_info.has_id() {
767 let mut meta = HashMap::with_capacity(1);
768 meta.insert(
769 PARQUET_FIELD_ID_META_KEY.to_string(),
770 basic_info.id().to_string(),
771 );
772 ret.set_metadata(meta);
773 }
774 try_add_extension_type(ret, parquet_type)
775 }
776 }
777}
778
779pub fn convert_schema(
785 schema: &SchemaDescriptor,
786 mask: ProjectionMask,
787 embedded_arrow_schema: Option<&Fields>,
788) -> Result<Option<ParquetField>> {
789 let mut visitor = Visitor {
790 next_col_idx: 0,
791 mask,
792 };
793
794 let context = VisitorContext {
795 rep_level: 0,
796 def_level: 0,
797 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
798 treat_repeated_as_list_arrow_hint: true,
799 };
800
801 visitor.dispatch(&schema.root_schema_ptr(), context)
802}
803
804pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
806 let mut visitor = Visitor {
807 next_col_idx: 0,
808 mask: ProjectionMask::all(),
809 };
810
811 let context = VisitorContext {
812 rep_level: 0,
813 def_level: 0,
814 data_type: None,
815 treat_repeated_as_list_arrow_hint: false,
817 };
818
819 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
820}
821
822#[cfg(test)]
823mod tests {
824 use crate::arrow::schema::complex::convert_schema;
825 use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
826 use crate::schema::parser::parse_message_type;
827 use crate::schema::types::SchemaDescriptor;
828 use arrow_schema::{DataType, Field, Fields};
829 use std::sync::Arc;
830
831 trait WithFieldId {
832 fn with_field_id(self, id: i32) -> Self;
833 }
834 impl WithFieldId for arrow_schema::Field {
835 fn with_field_id(self, id: i32) -> Self {
836 let mut metadata = self.metadata().clone();
837 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), id.to_string());
838 self.with_metadata(metadata)
839 }
840 }
841
842 fn test_roundtrip(message_type: &str) -> crate::errors::Result<()> {
843 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
844 let schema = SchemaDescriptor::new(parsed_input_schema);
845
846 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
847
848 let DataType::Struct(schema_fields) = &converted.arrow_type else {
849 panic!("Expected struct from convert_schema");
850 };
851
852 let converted_again =
854 convert_schema(&schema, ProjectionMask::all(), Some(schema_fields))?.unwrap();
855
856 assert_eq!(converted_again.arrow_type, converted.arrow_type);
858
859 Ok(())
860 }
861
862 fn test_expected_type(
863 message_type: &str,
864 expected_fields: Fields,
865 ) -> crate::errors::Result<()> {
866 test_roundtrip(message_type)?;
867
868 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
869 let schema = SchemaDescriptor::new(parsed_input_schema);
870
871 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
872
873 let DataType::Struct(schema_fields) = &converted.arrow_type else {
874 panic!("Expected struct from convert_schema");
875 };
876
877 assert_eq!(schema_fields, &expected_fields);
878
879 Ok(())
880 }
881
882 #[test]
884 fn basic_backward_compatible_list_1() -> crate::errors::Result<()> {
885 test_expected_type(
886 "
887 message schema {
888 optional group my_list (LIST) {
889 repeated int32 element;
890 }
891 }
892 ",
893 Fields::from(vec![
894 Field::new(
896 "my_list",
897 DataType::List(Arc::new(Field::new("element", DataType::Int32, false))),
898 true,
899 ),
900 ]),
901 )
902 }
903
904 #[test]
906 fn basic_backward_compatible_list_2() -> crate::errors::Result<()> {
907 test_expected_type(
908 "
909 message schema {
910 optional group my_list (LIST) {
911 repeated group element {
912 required binary str (STRING);
913 required int32 num;
914 }
915 }
916 }
917 ",
918 Fields::from(vec![
919 Field::new(
921 "my_list",
922 DataType::List(Arc::new(Field::new(
923 "element",
924 DataType::Struct(Fields::from(vec![
925 Field::new("str", DataType::Utf8, false),
926 Field::new("num", DataType::Int32, false),
927 ])),
928 false,
929 ))),
930 true,
931 ),
932 ]),
933 )
934 }
935
936 #[test]
938 fn basic_backward_compatible_list_3() -> crate::errors::Result<()> {
939 test_expected_type(
940 "
941 message schema {
942 optional group my_list (LIST) {
943 repeated group array (LIST) {
944 repeated int32 array;
945 }
946 }
947 }
948 ",
949 Fields::from(vec![
950 Field::new(
952 "my_list",
953 DataType::List(Arc::new(Field::new(
954 "array",
955 DataType::List(Arc::new(Field::new("array", DataType::Int32, false))),
956 false,
957 ))),
958 true,
959 ),
960 ]),
961 )
962 }
963
964 #[test]
966 fn basic_backward_compatible_list_4_1() -> crate::errors::Result<()> {
967 test_expected_type(
968 "
969 message schema {
970 optional group my_list (LIST) {
971 repeated group array {
972 required binary str (STRING);
973 }
974 }
975 }
976 ",
977 Fields::from(vec![
978 Field::new(
980 "my_list",
981 DataType::List(Arc::new(Field::new(
982 "array",
983 DataType::Struct(Fields::from(vec![Field::new(
984 "str",
985 DataType::Utf8,
986 false,
987 )])),
988 false,
989 ))),
990 true,
991 ),
992 ]),
993 )
994 }
995
996 #[test]
998 fn basic_backward_compatible_list_4_2() -> crate::errors::Result<()> {
999 test_expected_type(
1000 "
1001 message schema {
1002 optional group my_list (LIST) {
1003 repeated group my_list_tuple {
1004 required binary str (STRING);
1005 }
1006 }
1007 }
1008 ",
1009 Fields::from(vec![
1010 Field::new(
1012 "my_list",
1013 DataType::List(Arc::new(Field::new(
1014 "my_list_tuple",
1015 DataType::Struct(Fields::from(vec![Field::new(
1016 "str",
1017 DataType::Utf8,
1018 false,
1019 )])),
1020 false,
1021 ))),
1022 true,
1023 ),
1024 ]),
1025 )
1026 }
1027
1028 #[test]
1030 fn basic_backward_compatible_list_5() -> crate::errors::Result<()> {
1031 test_expected_type(
1032 "
1033 message schema {
1034 optional group my_list (LIST) {
1035 repeated group element {
1036 optional binary str (STRING);
1037 }
1038 }
1039 }
1040 ",
1041 Fields::from(vec![
1042 Field::new(
1044 "my_list",
1045 DataType::List(Arc::new(Field::new("str", DataType::Utf8, true))),
1046 true,
1047 ),
1048 ]),
1049 )
1050 }
1051
1052 #[test]
1053 fn basic_backward_compatible_map_1() -> crate::errors::Result<()> {
1054 test_expected_type(
1055 "
1056 message schema {
1057 optional group my_map (MAP) {
1058 repeated group map {
1059 required binary str (STRING);
1060 required int32 num;
1061 }
1062 }
1063 }
1064 ",
1065 Fields::from(vec![
1066 Field::new(
1068 "my_map",
1069 DataType::Map(
1070 Arc::new(Field::new(
1071 "map",
1072 DataType::Struct(Fields::from(vec![
1073 Field::new("str", DataType::Utf8, false),
1074 Field::new("num", DataType::Int32, false),
1075 ])),
1076 false,
1077 )),
1078 false,
1079 ),
1080 true,
1081 ),
1082 ]),
1083 )
1084 }
1085
1086 #[test]
1087 fn basic_backward_compatible_map_2() -> crate::errors::Result<()> {
1088 test_expected_type(
1089 "
1090 message schema {
1091 optional group my_map (MAP_KEY_VALUE) {
1092 repeated group map {
1093 required binary key (STRING);
1094 optional int32 value;
1095 }
1096 }
1097 }
1098 ",
1099 Fields::from(vec![
1100 Field::new(
1102 "my_map",
1103 DataType::Map(
1104 Arc::new(Field::new(
1105 "map",
1106 DataType::Struct(Fields::from(vec![
1107 Field::new("key", DataType::Utf8, false),
1108 Field::new("value", DataType::Int32, true),
1109 ])),
1110 false,
1111 )),
1112 false,
1113 ),
1114 true,
1115 ),
1116 ]),
1117 )
1118 }
1119
1120 #[test]
1121 fn convert_schema_with_nested_list_repeated_primitive() -> crate::errors::Result<()> {
1122 test_roundtrip(
1123 "
1124 message schema {
1125 optional group f1 (LIST) {
1126 repeated group element {
1127 repeated int32 element;
1128 }
1129 }
1130 }
1131 ",
1132 )
1133 }
1134
1135 #[test]
1136 fn convert_schema_with_repeated_primitive_keep_field_id() -> crate::errors::Result<()> {
1137 let message_type = "
1138 message schema {
1139 repeated BYTE_ARRAY col_1 = 1;
1140 }
1141 ";
1142
1143 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
1144 let schema = SchemaDescriptor::new(parsed_input_schema);
1145
1146 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
1147
1148 let DataType::Struct(schema_fields) = &converted.arrow_type else {
1149 panic!("Expected struct from convert_schema");
1150 };
1151
1152 assert_eq!(schema_fields.len(), 1);
1153
1154 let expected_schema = DataType::Struct(Fields::from(vec![Arc::new(
1155 arrow_schema::Field::new(
1156 "col_1",
1157 DataType::List(Arc::new(
1158 arrow_schema::Field::new("col_1", DataType::Binary, false),
1160 )),
1161 false,
1162 )
1163 .with_field_id(1),
1165 )]));
1166
1167 assert_eq!(converted.arrow_type, expected_schema);
1168
1169 Ok(())
1170 }
1171
1172 #[test]
1173 fn convert_schema_with_repeated_primitive_should_use_inferred_schema()
1174 -> crate::errors::Result<()> {
1175 let message_type = "
1176 message schema {
1177 repeated BYTE_ARRAY col_1 = 1;
1178 }
1179 ";
1180
1181 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
1182 let schema = SchemaDescriptor::new(parsed_input_schema);
1183
1184 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
1185
1186 let DataType::Struct(schema_fields) = &converted.arrow_type else {
1187 panic!("Expected struct from convert_schema");
1188 };
1189
1190 assert_eq!(schema_fields.len(), 1);
1191
1192 let expected_schema = DataType::Struct(Fields::from(vec![Arc::new(
1193 arrow_schema::Field::new(
1194 "col_1",
1195 DataType::List(Arc::new(arrow_schema::Field::new(
1196 "col_1",
1197 DataType::Binary,
1198 false,
1199 ))),
1200 false,
1201 )
1202 .with_metadata(schema_fields[0].metadata().clone()),
1203 )]));
1204
1205 assert_eq!(converted.arrow_type, expected_schema);
1206
1207 let utf8_instead_of_binary = Fields::from(vec![Arc::new(
1208 arrow_schema::Field::new(
1209 "col_1",
1210 DataType::List(Arc::new(arrow_schema::Field::new(
1211 "col_1",
1212 DataType::Utf8,
1213 false,
1214 ))),
1215 false,
1216 )
1217 .with_metadata(schema_fields[0].metadata().clone()),
1218 )]);
1219
1220 let converted_again = convert_schema(
1222 &schema,
1223 ProjectionMask::all(),
1224 Some(&utf8_instead_of_binary),
1225 )?
1226 .unwrap();
1227
1228 assert_eq!(
1230 converted_again.arrow_type,
1231 DataType::Struct(utf8_instead_of_binary)
1232 );
1233
1234 Ok(())
1235 }
1236
1237 #[test]
1238 fn convert_schema_with_repeated_primitive_should_use_inferred_schema_for_list_as_well()
1239 -> crate::errors::Result<()> {
1240 let message_type = "
1241 message schema {
1242 repeated BYTE_ARRAY col_1 = 1;
1243 }
1244 ";
1245
1246 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
1247 let schema = SchemaDescriptor::new(parsed_input_schema);
1248
1249 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
1250
1251 let DataType::Struct(schema_fields) = &converted.arrow_type else {
1252 panic!("Expected struct from convert_schema");
1253 };
1254
1255 assert_eq!(schema_fields.len(), 1);
1256
1257 let expected_schema = DataType::Struct(Fields::from(vec![Arc::new(
1258 arrow_schema::Field::new(
1259 "col_1",
1260 DataType::List(Arc::new(arrow_schema::Field::new(
1261 "col_1",
1262 DataType::Binary,
1263 false,
1264 ))),
1265 false,
1266 )
1267 .with_metadata(schema_fields[0].metadata().clone()),
1268 )]));
1269
1270 assert_eq!(converted.arrow_type, expected_schema);
1271
1272 let utf8_instead_of_binary = Fields::from(vec![Arc::new(
1273 arrow_schema::Field::new(
1274 "col_1",
1275 DataType::LargeList(Arc::new(arrow_schema::Field::new(
1277 "col_1",
1278 DataType::Utf8,
1279 false,
1280 ))),
1281 false,
1282 )
1283 .with_metadata(schema_fields[0].metadata().clone()),
1284 )]);
1285
1286 let converted_again = convert_schema(
1288 &schema,
1289 ProjectionMask::all(),
1290 Some(&utf8_instead_of_binary),
1291 )?
1292 .unwrap();
1293
1294 assert_eq!(
1296 converted_again.arrow_type,
1297 DataType::Struct(utf8_instead_of_binary)
1298 );
1299
1300 Ok(())
1301 }
1302
1303 #[test]
1304 fn convert_schema_with_repeated_struct_and_inferred_schema() -> crate::errors::Result<()> {
1305 test_roundtrip(
1306 "
1307 message schema {
1308 repeated group my_col_1 = 1 {
1309 optional binary my_col_2 = 2;
1310 optional binary my_col_3 = 3;
1311 optional group my_col_4 = 4 {
1312 optional int64 my_col_5 = 5;
1313 optional int32 my_col_6 = 6;
1314 }
1315 }
1316 }
1317 ",
1318 )
1319 }
1320
1321 #[test]
1322 fn convert_schema_with_repeated_struct_and_inferred_schema_and_field_id()
1323 -> crate::errors::Result<()> {
1324 let message_type = "
1325 message schema {
1326 repeated group my_col_1 = 1 {
1327 optional binary my_col_2 = 2;
1328 optional binary my_col_3 = 3;
1329 optional group my_col_4 = 4 {
1330 optional int64 my_col_5 = 5;
1331 optional int32 my_col_6 = 6;
1332 }
1333 }
1334 }
1335 ";
1336
1337 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
1338 let schema = SchemaDescriptor::new(parsed_input_schema);
1339
1340 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
1341
1342 let DataType::Struct(schema_fields) = &converted.arrow_type else {
1343 panic!("Expected struct from convert_schema");
1344 };
1345
1346 assert_eq!(schema_fields.len(), 1);
1347
1348 let converted_again =
1350 convert_schema(&schema, ProjectionMask::all(), Some(schema_fields))?.unwrap();
1351
1352 assert_eq!(converted_again.arrow_type, converted.arrow_type);
1354
1355 Ok(())
1356 }
1357
1358 #[test]
1359 fn convert_schema_with_nested_repeated_struct_and_primitives() -> crate::errors::Result<()> {
1360 let message_type = "
1361message schema {
1362 repeated group my_col_1 = 1 {
1363 optional binary my_col_2 = 2;
1364 repeated BYTE_ARRAY my_col_3 = 3;
1365 repeated group my_col_4 = 4 {
1366 optional int64 my_col_5 = 5;
1367 repeated binary my_col_6 = 6;
1368 }
1369 }
1370}
1371";
1372
1373 let parsed_input_schema = Arc::new(parse_message_type(message_type)?);
1374 let schema = SchemaDescriptor::new(parsed_input_schema);
1375
1376 let converted = convert_schema(&schema, ProjectionMask::all(), None)?.unwrap();
1377
1378 let DataType::Struct(schema_fields) = &converted.arrow_type else {
1379 panic!("Expected struct from convert_schema");
1380 };
1381
1382 assert_eq!(schema_fields.len(), 1);
1383
1384 let expected_schema = DataType::Struct(Fields::from(vec![Arc::new(
1386 arrow_schema::Field::new(
1387 "my_col_1",
1388 DataType::List(Arc::new(arrow_schema::Field::new(
1389 "my_col_1",
1390 DataType::Struct(Fields::from(vec![
1391 Arc::new(
1392 arrow_schema::Field::new("my_col_2", DataType::Binary, true)
1393 .with_field_id(2),
1394 ),
1395 Arc::new(
1396 arrow_schema::Field::new(
1397 "my_col_3",
1398 DataType::List(Arc::new(arrow_schema::Field::new(
1399 "my_col_3",
1400 DataType::Binary,
1401 false,
1402 ))),
1403 false,
1404 )
1405 .with_field_id(3),
1407 ),
1408 Arc::new(
1409 arrow_schema::Field::new(
1410 "my_col_4",
1411 DataType::List(Arc::new(arrow_schema::Field::new(
1412 "my_col_4",
1413 DataType::Struct(Fields::from(vec![
1414 Arc::new(
1415 arrow_schema::Field::new(
1416 "my_col_5",
1417 DataType::Int64,
1418 true,
1419 )
1420 .with_field_id(5),
1422 ),
1423 Arc::new(
1424 arrow_schema::Field::new(
1425 "my_col_6",
1426 DataType::List(Arc::new(arrow_schema::Field::new(
1427 "my_col_6",
1428 DataType::Binary,
1429 false,
1430 ))),
1431 false,
1432 )
1433 .with_field_id(6),
1435 ),
1436 ])),
1437 false,
1438 ))),
1439 false,
1440 )
1441 .with_field_id(4),
1443 ),
1444 ])),
1445 false,
1446 ))),
1447 false,
1448 )
1449 .with_field_id(1),
1451 )]));
1452
1453 assert_eq!(converted.arrow_type, expected_schema);
1454
1455 let converted_again =
1457 convert_schema(&schema, ProjectionMask::all(), Some(schema_fields))?.unwrap();
1458
1459 assert_eq!(converted_again.arrow_type, converted.arrow_type);
1460
1461 let modified_schema_fields = Fields::from(vec![Arc::new(
1464 arrow_schema::Field::new(
1465 "my_col_1",
1466 DataType::LargeList(Arc::new(arrow_schema::Field::new(
1467 "my_col_1",
1468 DataType::Struct(Fields::from(vec![
1469 Arc::new(
1470 arrow_schema::Field::new("my_col_2", DataType::LargeBinary, true)
1471 .with_field_id(2),
1472 ),
1473 Arc::new(
1474 arrow_schema::Field::new(
1475 "my_col_3",
1476 DataType::LargeList(Arc::new(arrow_schema::Field::new(
1477 "my_col_3",
1478 DataType::Utf8,
1479 false,
1480 ))),
1481 false,
1482 )
1483 .with_field_id(3),
1485 ),
1486 Arc::new(
1487 arrow_schema::Field::new(
1488 "my_col_4",
1489 DataType::FixedSizeList(
1490 Arc::new(arrow_schema::Field::new(
1491 "my_col_4",
1492 DataType::Struct(Fields::from(vec![
1493 Arc::new(
1494 arrow_schema::Field::new(
1495 "my_col_5",
1496 DataType::Int64,
1497 true,
1498 )
1499 .with_field_id(5),
1500 ),
1501 Arc::new(
1502 arrow_schema::Field::new(
1503 "my_col_6",
1504 DataType::LargeList(Arc::new(
1505 arrow_schema::Field::new(
1506 "my_col_6",
1507 DataType::BinaryView,
1508 false,
1509 ),
1510 )),
1511 false,
1512 )
1513 .with_field_id(6),
1515 ),
1516 ])),
1517 false,
1518 )),
1519 3,
1520 ),
1521 false,
1522 )
1523 .with_field_id(4),
1525 ),
1526 ])),
1527 false,
1528 ))),
1529 false,
1530 )
1531 .with_field_id(1),
1533 )]);
1534
1535 let converted_with_modified = convert_schema(
1536 &schema,
1537 ProjectionMask::all(),
1538 Some(&modified_schema_fields),
1539 )?
1540 .unwrap();
1541
1542 assert_eq!(
1543 converted_with_modified.arrow_type,
1544 DataType::Struct(modified_schema_fields)
1545 );
1546
1547 Ok(())
1548 }
1549
1550 #[test]
1553 fn list_nullable_element_standard() -> crate::errors::Result<()> {
1554 test_expected_type(
1555 "
1556 message root {
1557 optional group f1 (LIST) {
1558 repeated group list {
1559 optional int32 element;
1560 }
1561 }
1562 }",
1563 Fields::from(vec![Field::new(
1564 "f1",
1565 DataType::List(Arc::new(Field::new("element", DataType::Int32, true))),
1566 true,
1567 )]),
1568 )
1569 }
1570
1571 #[test]
1574 fn list_nullable_element_nested() -> crate::errors::Result<()> {
1575 test_expected_type(
1576 "
1577 message root {
1578 optional group f1 (LIST) {
1579 repeated group element {
1580 optional int32 num;
1581 }
1582 }
1583 }",
1584 Fields::from(vec![Field::new(
1585 "f1",
1586 DataType::List(Arc::new(Field::new("num", DataType::Int32, true))),
1587 true,
1588 )]),
1589 )
1590 }
1591
1592 #[test]
1595 fn list_required_element_standard() -> crate::errors::Result<()> {
1596 test_expected_type(
1597 "
1598 message root {
1599 optional group f1 (LIST) {
1600 repeated group list {
1601 required int32 element;
1602 }
1603 }
1604 }",
1605 Fields::from(vec![Field::new(
1606 "f1",
1607 DataType::List(Arc::new(Field::new("element", DataType::Int32, false))),
1608 true,
1609 )]),
1610 )
1611 }
1612
1613 #[test]
1616 fn list_required_element_nested() -> crate::errors::Result<()> {
1617 test_expected_type(
1618 "
1619 message root {
1620 optional group f1 (LIST) {
1621 repeated group element {
1622 required int32 num;
1623 }
1624 }
1625 }",
1626 Fields::from(vec![Field::new(
1627 "f1",
1628 DataType::List(Arc::new(Field::new("num", DataType::Int32, false))),
1629 true,
1630 )]),
1631 )
1632 }
1633
1634 #[test]
1637 fn list_required_element_primitive() -> crate::errors::Result<()> {
1638 test_expected_type(
1639 "
1640 message root {
1641 optional group f1 (LIST) {
1642 repeated int32 element;
1643 }
1644 }",
1645 Fields::from(vec![Field::new(
1646 "f1",
1647 DataType::List(Arc::new(Field::new("element", DataType::Int32, false))),
1648 true,
1649 )]),
1650 )
1651 }
1652
1653 #[test]
1656 fn list_required_element_struct() -> crate::errors::Result<()> {
1657 test_expected_type(
1658 "
1659 message root {
1660 optional group f1 (LIST) {
1661 repeated group element {
1662 required binary str (UTF8);
1663 required int32 num;
1664 }
1665 }
1666 }",
1667 Fields::from(vec![Field::new(
1668 "f1",
1669 DataType::List(Arc::new(Field::new(
1670 "element",
1671 DataType::Struct(Fields::from(vec![
1672 Field::new("str", DataType::Utf8, false),
1673 Field::new("num", DataType::Int32, false),
1674 ])),
1675 false,
1676 ))),
1677 true,
1678 )]),
1679 )
1680 }
1681
1682 #[test]
1685 fn list_required_element_avro_style() -> crate::errors::Result<()> {
1686 test_expected_type(
1687 "
1688 message root {
1689 optional group f1 (LIST) {
1690 repeated group array {
1691 required binary str (UTF8);
1692 }
1693 }
1694 }",
1695 Fields::from(vec![Field::new(
1696 "f1",
1697 DataType::List(Arc::new(Field::new(
1698 "array",
1699 DataType::Struct(Fields::from(vec![Field::new("str", DataType::Utf8, false)])),
1700 false,
1701 ))),
1702 true,
1703 )]),
1704 )
1705 }
1706
1707 #[test]
1710 fn list_required_element_thrift_style() -> crate::errors::Result<()> {
1711 test_expected_type(
1712 "
1713 message root {
1714 optional group f1 (LIST) {
1715 repeated group f1_tuple {
1716 required binary str (UTF8);
1717 }
1718 }
1719 }",
1720 Fields::from(vec![Field::new(
1721 "f1",
1722 DataType::List(Arc::new(Field::new(
1723 "f1_tuple",
1724 DataType::Struct(Fields::from(vec![Field::new("str", DataType::Utf8, false)])),
1725 false,
1726 ))),
1727 true,
1728 )]),
1729 )
1730 }
1731
1732 #[test]
1735 fn map_required_value_standard() -> crate::errors::Result<()> {
1736 test_expected_type(
1737 "
1738 message root {
1739 optional group f1 (MAP) {
1740 repeated group key_value {
1741 required int32 key;
1742 required binary value (UTF8);
1743 }
1744 }
1745 }",
1746 Fields::from(vec![Field::new_map(
1747 "f1",
1748 "key_value",
1749 Field::new("key", DataType::Int32, false),
1750 Field::new("value", DataType::Utf8, false),
1751 false,
1752 true,
1753 )]),
1754 )
1755 }
1756
1757 #[test]
1760 fn map_required_value_map_key_value() -> crate::errors::Result<()> {
1761 test_expected_type(
1762 "
1763 message root {
1764 optional group f1 (MAP_KEY_VALUE) {
1765 repeated group map {
1766 required int32 num;
1767 required binary str (UTF8);
1768 }
1769 }
1770 }",
1771 Fields::from(vec![Field::new_map(
1772 "f1",
1773 "map",
1774 Field::new("num", DataType::Int32, false),
1775 Field::new("str", DataType::Utf8, false),
1776 false,
1777 true,
1778 )]),
1779 )
1780 }
1781
1782 #[test]
1785 fn map_required_value_legacy() -> crate::errors::Result<()> {
1786 test_expected_type(
1787 "
1788 message root {
1789 optional group f1 (MAP) {
1790 repeated group map (MAP_KEY_VALUE) {
1791 required int32 key;
1792 required binary value (UTF8);
1793 }
1794 }
1795 }",
1796 Fields::from(vec![Field::new_map(
1797 "f1",
1798 "map",
1799 Field::new("key", DataType::Int32, false),
1800 Field::new("value", DataType::Utf8, false),
1801 false,
1802 true,
1803 )]),
1804 )
1805 }
1806
1807 #[test]
1810 fn map_optional_value_standard() -> crate::errors::Result<()> {
1811 test_expected_type(
1812 "
1813 message root {
1814 optional group f1 (MAP) {
1815 repeated group key_value {
1816 required int32 key;
1817 optional binary value (UTF8);
1818 }
1819 }
1820 }",
1821 Fields::from(vec![Field::new_map(
1822 "f1",
1823 "key_value",
1824 Field::new("key", DataType::Int32, false),
1825 Field::new("value", DataType::Utf8, true),
1826 false,
1827 true,
1828 )]),
1829 )
1830 }
1831
1832 #[test]
1835 fn map_optional_value_map_key_value() -> crate::errors::Result<()> {
1836 test_expected_type(
1837 "
1838 message root {
1839 optional group f1 (MAP_KEY_VALUE) {
1840 repeated group map {
1841 required int32 num;
1842 optional binary str (UTF8);
1843 }
1844 }
1845 }",
1846 Fields::from(vec![Field::new_map(
1847 "f1",
1848 "map",
1849 Field::new("num", DataType::Int32, false),
1850 Field::new("str", DataType::Utf8, true),
1851 false,
1852 true,
1853 )]),
1854 )
1855 }
1856
1857 #[test]
1860 fn map_optional_value_avro_style() -> crate::errors::Result<()> {
1861 test_expected_type(
1862 "
1863 message root {
1864 optional group f1 (MAP) {
1865 repeated group map (MAP_KEY_VALUE) {
1866 required int32 key;
1867 optional binary value (UTF8);
1868 }
1869 }
1870 }",
1871 Fields::from(vec![Field::new_map(
1872 "f1",
1873 "map",
1874 Field::new("key", DataType::Int32, false),
1875 Field::new("value", DataType::Utf8, true),
1876 false,
1877 true,
1878 )]),
1879 )
1880 }
1881}