1use std::vec::IntoIter;
21use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::file::metadata::HeapSize;
24use crate::file::metadata::thrift::SchemaElement;
25
26use crate::basic::{
27 ColumnOrder, ConvertedType, LogicalType, Repetition, SortOrder, TimeUnit, Type as PhysicalType,
28};
29use crate::errors::{ParquetError, Result};
30
31pub type TypePtr = Arc<Type>;
36pub type SchemaDescPtr = Arc<SchemaDescriptor>;
38pub type ColumnDescPtr = Arc<ColumnDescriptor>;
40
41#[derive(Clone, Debug, PartialEq)]
48pub enum Type {
49 PrimitiveType {
51 basic_info: BasicTypeInfo,
53 physical_type: PhysicalType,
55 type_length: i32,
57 scale: i32,
59 precision: i32,
61 },
62 GroupType {
64 basic_info: BasicTypeInfo,
66 fields: Vec<TypePtr>,
68 },
69}
70
71impl HeapSize for Type {
72 fn heap_size(&self) -> usize {
73 match self {
74 Type::PrimitiveType { basic_info, .. } => basic_info.heap_size(),
75 Type::GroupType { basic_info, fields } => basic_info.heap_size() + fields.heap_size(),
76 }
77 }
78}
79
80impl Type {
81 pub fn primitive_type_builder(
83 name: &str,
84 physical_type: PhysicalType,
85 ) -> PrimitiveTypeBuilder<'_> {
86 PrimitiveTypeBuilder::new(name, physical_type)
87 }
88
89 pub fn group_type_builder(name: &str) -> GroupTypeBuilder<'_> {
91 GroupTypeBuilder::new(name)
92 }
93
94 pub fn get_basic_info(&self) -> &BasicTypeInfo {
96 match *self {
97 Type::PrimitiveType { ref basic_info, .. } => basic_info,
98 Type::GroupType { ref basic_info, .. } => basic_info,
99 }
100 }
101
102 pub fn name(&self) -> &str {
104 self.get_basic_info().name()
105 }
106
107 pub fn get_fields(&self) -> &[TypePtr] {
111 match *self {
112 Type::GroupType { ref fields, .. } => &fields[..],
113 _ => panic!("Cannot call get_fields() on a non-group type"),
114 }
115 }
116
117 pub fn get_physical_type(&self) -> PhysicalType {
120 match *self {
121 Type::PrimitiveType {
122 basic_info: _,
123 physical_type,
124 ..
125 } => physical_type,
126 _ => panic!("Cannot call get_physical_type() on a non-primitive type"),
127 }
128 }
129
130 pub fn get_precision(&self) -> i32 {
133 match *self {
134 Type::PrimitiveType { precision, .. } => precision,
135 _ => panic!("Cannot call get_precision() on non-primitive type"),
136 }
137 }
138
139 pub fn get_scale(&self) -> i32 {
142 match *self {
143 Type::PrimitiveType { scale, .. } => scale,
144 _ => panic!("Cannot call get_scale() on non-primitive type"),
145 }
146 }
147
148 pub fn check_contains(&self, sub_type: &Type) -> bool {
151 let basic_match = self.get_basic_info().name() == sub_type.get_basic_info().name()
153 && (self.is_schema() && sub_type.is_schema()
154 || !self.is_schema()
155 && !sub_type.is_schema()
156 && self.get_basic_info().repetition()
157 == sub_type.get_basic_info().repetition());
158
159 match *self {
160 Type::PrimitiveType { .. } if basic_match && sub_type.is_primitive() => {
161 self.get_physical_type() == sub_type.get_physical_type()
162 }
163 Type::GroupType { .. } if basic_match && sub_type.is_group() => {
164 let mut field_map = HashMap::new();
166 for field in self.get_fields() {
167 field_map.insert(field.name(), field);
168 }
169
170 for field in sub_type.get_fields() {
171 if !field_map
172 .get(field.name())
173 .map(|tpe| tpe.check_contains(field))
174 .unwrap_or(false)
175 {
176 return false;
177 }
178 }
179 true
180 }
181 _ => false,
182 }
183 }
184
185 pub fn is_primitive(&self) -> bool {
187 matches!(*self, Type::PrimitiveType { .. })
188 }
189
190 pub fn is_group(&self) -> bool {
192 matches!(*self, Type::GroupType { .. })
193 }
194
195 pub fn is_schema(&self) -> bool {
197 match *self {
198 Type::GroupType { ref basic_info, .. } => !basic_info.has_repetition(),
199 _ => false,
200 }
201 }
202
203 pub fn is_optional(&self) -> bool {
206 self.get_basic_info().has_repetition()
207 && self.get_basic_info().repetition() != Repetition::REQUIRED
208 }
209
210 pub(crate) fn is_list(&self) -> bool {
212 if self.is_group() {
213 let basic_info = self.get_basic_info();
214 if let Some(logical_type) = basic_info.logical_type_ref() {
215 return logical_type == &LogicalType::List;
216 }
217 return basic_info.converted_type() == ConvertedType::LIST;
218 }
219 false
220 }
221
222 pub(crate) fn has_single_repeated_child(&self) -> bool {
224 if self.is_group() {
225 let children = self.get_fields();
226 return children.len() == 1
227 && children[0].get_basic_info().has_repetition()
228 && children[0].get_basic_info().repetition() == Repetition::REPEATED;
229 }
230 false
231 }
232}
233
234pub struct PrimitiveTypeBuilder<'a> {
238 name: &'a str,
239 repetition: Repetition,
240 physical_type: PhysicalType,
241 converted_type: ConvertedType,
242 logical_type: Option<LogicalType>,
243 length: i32,
244 precision: i32,
245 scale: i32,
246 id: Option<i32>,
247}
248
249impl<'a> PrimitiveTypeBuilder<'a> {
250 pub fn new(name: &'a str, physical_type: PhysicalType) -> Self {
252 Self {
253 name,
254 repetition: Repetition::OPTIONAL,
255 physical_type,
256 converted_type: ConvertedType::NONE,
257 logical_type: None,
258 length: -1,
259 precision: -1,
260 scale: -1,
261 id: None,
262 }
263 }
264
265 pub fn with_repetition(self, repetition: Repetition) -> Self {
267 Self { repetition, ..self }
268 }
269
270 pub fn with_converted_type(self, converted_type: ConvertedType) -> Self {
272 Self {
273 converted_type,
274 ..self
275 }
276 }
277
278 pub fn with_logical_type(self, logical_type: Option<LogicalType>) -> Self {
282 Self {
283 logical_type,
284 ..self
285 }
286 }
287
288 pub fn with_length(self, length: i32) -> Self {
293 Self { length, ..self }
294 }
295
296 pub fn with_precision(self, precision: i32) -> Self {
299 Self { precision, ..self }
300 }
301
302 pub fn with_scale(self, scale: i32) -> Self {
305 Self { scale, ..self }
306 }
307
308 pub fn with_id(self, id: Option<i32>) -> Self {
310 Self { id, ..self }
311 }
312
313 pub fn build(self) -> Result<Type> {
316 let mut basic_info = BasicTypeInfo {
317 name: String::from(self.name),
318 repetition: Some(self.repetition),
319 converted_type: self.converted_type,
320 logical_type: self.logical_type.clone(),
321 id: self.id,
322 };
323
324 if self.physical_type == PhysicalType::FIXED_LEN_BYTE_ARRAY && self.length < 0 {
326 return Err(general_err!(
327 "Invalid FIXED_LEN_BYTE_ARRAY length: {} for field '{}'",
328 self.length,
329 self.name
330 ));
331 }
332
333 if let Some(logical_type) = &self.logical_type {
334 if self.converted_type != ConvertedType::NONE {
337 if ConvertedType::from(self.logical_type.clone()) != self.converted_type {
338 return Err(general_err!(
339 "Logical type {:?} is incompatible with converted type {} for field '{}'",
340 logical_type,
341 self.converted_type,
342 self.name
343 ));
344 }
345 } else {
346 basic_info.converted_type = self.logical_type.clone().into();
348 }
349 match (logical_type, self.physical_type) {
351 (LogicalType::Map, _) | (LogicalType::List, _) => {
352 return Err(general_err!(
353 "{:?} cannot be applied to a primitive type for field '{}'",
354 logical_type,
355 self.name
356 ));
357 }
358 (LogicalType::Enum, PhysicalType::BYTE_ARRAY) => {}
359 (LogicalType::Decimal { scale, precision }, _) => {
360 if *scale != self.scale {
362 return Err(general_err!(
363 "DECIMAL logical type scale {} must match self.scale {} for field '{}'",
364 scale,
365 self.scale,
366 self.name
367 ));
368 }
369 if *precision != self.precision {
370 return Err(general_err!(
371 "DECIMAL logical type precision {} must match self.precision {} for field '{}'",
372 precision,
373 self.precision,
374 self.name
375 ));
376 }
377 self.check_decimal_precision_scale()?;
378 }
379 (LogicalType::Date, PhysicalType::INT32) => {}
380 (
381 LogicalType::Time {
382 unit: TimeUnit::MILLIS,
383 ..
384 },
385 PhysicalType::INT32,
386 ) => {}
387 (LogicalType::Time { unit, .. }, PhysicalType::INT64) => {
388 if *unit == TimeUnit::MILLIS {
389 return Err(general_err!(
390 "Cannot use millisecond unit on INT64 type for field '{}'",
391 self.name
392 ));
393 }
394 }
395 (LogicalType::Timestamp { .. }, PhysicalType::INT64) => {}
396 (LogicalType::Integer { bit_width, .. }, PhysicalType::INT32)
397 if *bit_width <= 32 => {}
398 (LogicalType::Integer { bit_width, .. }, PhysicalType::INT64)
399 if *bit_width == 64 => {}
400 (LogicalType::Unknown, PhysicalType::INT32) => {}
402 (LogicalType::String, PhysicalType::BYTE_ARRAY) => {}
403 (LogicalType::Json, PhysicalType::BYTE_ARRAY) => {}
404 (LogicalType::Bson, PhysicalType::BYTE_ARRAY) => {}
405 (LogicalType::Geometry { .. }, PhysicalType::BYTE_ARRAY) => {}
406 (LogicalType::Geography { .. }, PhysicalType::BYTE_ARRAY) => {}
407 (LogicalType::Uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY) if self.length == 16 => {}
408 (LogicalType::Uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {
409 return Err(general_err!(
410 "UUID cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(16) field",
411 self.name
412 ));
413 }
414 (LogicalType::Float16, PhysicalType::FIXED_LEN_BYTE_ARRAY) if self.length == 2 => {}
415 (LogicalType::Float16, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {
416 return Err(general_err!(
417 "FLOAT16 cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(2) field",
418 self.name
419 ));
420 }
421 (LogicalType::_Unknown { .. }, _) => {}
423 (a, b) => {
424 return Err(general_err!(
425 "Cannot annotate {:?} from {} for field '{}'",
426 a,
427 b,
428 self.name
429 ));
430 }
431 }
432 }
433
434 match self.converted_type {
435 ConvertedType::NONE => {}
436 ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => {
437 if self.physical_type != PhysicalType::BYTE_ARRAY {
438 return Err(general_err!(
439 "{} cannot annotate field '{}' because it is not a BYTE_ARRAY field",
440 self.converted_type,
441 self.name
442 ));
443 }
444 }
445 ConvertedType::DECIMAL => {
446 self.check_decimal_precision_scale()?;
447 }
448 ConvertedType::DATE
449 | ConvertedType::TIME_MILLIS
450 | ConvertedType::UINT_8
451 | ConvertedType::UINT_16
452 | ConvertedType::UINT_32
453 | ConvertedType::INT_8
454 | ConvertedType::INT_16
455 | ConvertedType::INT_32 => {
456 if self.physical_type != PhysicalType::INT32 {
457 return Err(general_err!(
458 "{} cannot annotate field '{}' because it is not a INT32 field",
459 self.converted_type,
460 self.name
461 ));
462 }
463 }
464 ConvertedType::TIME_MICROS
465 | ConvertedType::TIMESTAMP_MILLIS
466 | ConvertedType::TIMESTAMP_MICROS
467 | ConvertedType::UINT_64
468 | ConvertedType::INT_64 => {
469 if self.physical_type != PhysicalType::INT64 {
470 return Err(general_err!(
471 "{} cannot annotate field '{}' because it is not a INT64 field",
472 self.converted_type,
473 self.name
474 ));
475 }
476 }
477 ConvertedType::INTERVAL => {
478 if self.physical_type != PhysicalType::FIXED_LEN_BYTE_ARRAY || self.length != 12 {
479 return Err(general_err!(
480 "INTERVAL cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(12) field",
481 self.name
482 ));
483 }
484 }
485 ConvertedType::ENUM => {
486 if self.physical_type != PhysicalType::BYTE_ARRAY {
487 return Err(general_err!(
488 "ENUM cannot annotate field '{}' because it is not a BYTE_ARRAY field",
489 self.name
490 ));
491 }
492 }
493 _ => {
494 return Err(general_err!(
495 "{} cannot be applied to primitive field '{}'",
496 self.converted_type,
497 self.name
498 ));
499 }
500 }
501
502 Ok(Type::PrimitiveType {
503 basic_info,
504 physical_type: self.physical_type,
505 type_length: self.length,
506 scale: self.scale,
507 precision: self.precision,
508 })
509 }
510
511 #[inline]
512 fn check_decimal_precision_scale(&self) -> Result<()> {
513 match self.physical_type {
514 PhysicalType::INT32
515 | PhysicalType::INT64
516 | PhysicalType::BYTE_ARRAY
517 | PhysicalType::FIXED_LEN_BYTE_ARRAY => (),
518 _ => {
519 return Err(general_err!(
520 "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
521 ));
522 }
523 }
524
525 if self.precision < 1 {
527 return Err(general_err!(
528 "Invalid DECIMAL precision: {}",
529 self.precision
530 ));
531 }
532
533 if self.scale < 0 {
535 return Err(general_err!("Invalid DECIMAL scale: {}", self.scale));
536 }
537
538 if self.scale > self.precision {
539 return Err(general_err!(
540 "Invalid DECIMAL: scale ({}) cannot be greater than precision \
541 ({})",
542 self.scale,
543 self.precision
544 ));
545 }
546
547 match self.physical_type {
549 PhysicalType::INT32 => {
550 if self.precision > 9 {
551 return Err(general_err!(
552 "Cannot represent INT32 as DECIMAL with precision {}",
553 self.precision
554 ));
555 }
556 }
557 PhysicalType::INT64 => {
558 if self.precision > 18 {
559 return Err(general_err!(
560 "Cannot represent INT64 as DECIMAL with precision {}",
561 self.precision
562 ));
563 }
564 }
565 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
566 let length = self
567 .length
568 .checked_mul(8)
569 .ok_or(general_err!("Invalid length {} for Decimal", self.length))?;
570 let max_precision = (2f64.powi(length - 1) - 1f64).log10().floor() as i32;
571
572 if self.precision > max_precision {
573 return Err(general_err!(
574 "Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \
575 precision {}. The max precision can only be {}",
576 self.length,
577 self.precision,
578 max_precision
579 ));
580 }
581 }
582 _ => (), }
584
585 Ok(())
586 }
587}
588
589pub struct GroupTypeBuilder<'a> {
593 name: &'a str,
594 repetition: Option<Repetition>,
595 converted_type: ConvertedType,
596 logical_type: Option<LogicalType>,
597 fields: Vec<TypePtr>,
598 id: Option<i32>,
599}
600
601impl<'a> GroupTypeBuilder<'a> {
602 pub fn new(name: &'a str) -> Self {
604 Self {
605 name,
606 repetition: None,
607 converted_type: ConvertedType::NONE,
608 logical_type: None,
609 fields: Vec::new(),
610 id: None,
611 }
612 }
613
614 pub fn with_repetition(mut self, repetition: Repetition) -> Self {
616 self.repetition = Some(repetition);
617 self
618 }
619
620 pub fn with_converted_type(self, converted_type: ConvertedType) -> Self {
622 Self {
623 converted_type,
624 ..self
625 }
626 }
627
628 pub fn with_logical_type(self, logical_type: Option<LogicalType>) -> Self {
630 Self {
631 logical_type,
632 ..self
633 }
634 }
635
636 pub fn with_fields(self, fields: Vec<TypePtr>) -> Self {
639 Self { fields, ..self }
640 }
641
642 pub fn with_id(self, id: Option<i32>) -> Self {
644 Self { id, ..self }
645 }
646
647 pub fn build(self) -> Result<Type> {
649 let mut basic_info = BasicTypeInfo {
650 name: String::from(self.name),
651 repetition: self.repetition,
652 converted_type: self.converted_type,
653 logical_type: self.logical_type.clone(),
654 id: self.id,
655 };
656 if self.logical_type.is_some() && self.converted_type == ConvertedType::NONE {
658 basic_info.converted_type = self.logical_type.into();
659 }
660 Ok(Type::GroupType {
661 basic_info,
662 fields: self.fields,
663 })
664 }
665}
666
667#[derive(Clone, Debug, PartialEq, Eq)]
670pub struct BasicTypeInfo {
671 name: String,
672 repetition: Option<Repetition>,
673 converted_type: ConvertedType,
674 logical_type: Option<LogicalType>,
675 id: Option<i32>,
676}
677
678impl HeapSize for BasicTypeInfo {
679 fn heap_size(&self) -> usize {
680 self.name.heap_size()
682 }
683}
684
685impl BasicTypeInfo {
686 pub fn name(&self) -> &str {
688 &self.name
689 }
690
691 pub fn has_repetition(&self) -> bool {
695 self.repetition.is_some()
696 }
697
698 pub fn repetition(&self) -> Repetition {
700 assert!(self.repetition.is_some());
701 self.repetition.unwrap()
702 }
703
704 pub fn converted_type(&self) -> ConvertedType {
706 self.converted_type
707 }
708
709 #[deprecated(
714 since = "57.1.0",
715 note = "use `BasicTypeInfo::logical_type_ref` instead (LogicalType cloning is non trivial)"
716 )]
717 pub fn logical_type(&self) -> Option<LogicalType> {
718 self.logical_type.clone()
720 }
721
722 pub fn logical_type_ref(&self) -> Option<&LogicalType> {
724 self.logical_type.as_ref()
725 }
726
727 pub fn has_id(&self) -> bool {
729 self.id.is_some()
730 }
731
732 pub fn id(&self) -> i32 {
734 assert!(self.id.is_some());
735 self.id.unwrap()
736 }
737}
738
739#[derive(Clone, PartialEq, Debug, Eq, Hash)]
761pub struct ColumnPath {
762 parts: Vec<String>,
763}
764
765impl HeapSize for ColumnPath {
766 fn heap_size(&self) -> usize {
767 self.parts.heap_size()
768 }
769}
770
771impl ColumnPath {
772 pub fn new(parts: Vec<String>) -> Self {
774 ColumnPath { parts }
775 }
776
777 pub fn string(&self) -> String {
785 self.parts.join(".")
786 }
787
788 pub fn append(&mut self, mut tail: Vec<String>) {
800 self.parts.append(&mut tail);
801 }
802
803 pub fn parts(&self) -> &[String] {
805 &self.parts
806 }
807}
808
809impl fmt::Display for ColumnPath {
810 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
811 write!(f, "{:?}", self.string())
812 }
813}
814
815impl From<Vec<String>> for ColumnPath {
816 fn from(parts: Vec<String>) -> Self {
817 ColumnPath { parts }
818 }
819}
820
821impl From<&str> for ColumnPath {
822 fn from(single_path: &str) -> Self {
823 let s = String::from(single_path);
824 ColumnPath::from(s)
825 }
826}
827
828impl From<String> for ColumnPath {
829 fn from(single_path: String) -> Self {
830 let v = vec![single_path];
831 ColumnPath { parts: v }
832 }
833}
834
835impl AsRef<[String]> for ColumnPath {
836 fn as_ref(&self) -> &[String] {
837 &self.parts
838 }
839}
840
841#[derive(Debug, PartialEq)]
846pub struct ColumnDescriptor {
847 primitive_type: TypePtr,
849
850 max_def_level: i16,
852
853 max_rep_level: i16,
855
856 path: ColumnPath,
858}
859
860impl HeapSize for ColumnDescriptor {
861 fn heap_size(&self) -> usize {
862 self.path.heap_size()
865 }
866}
867
868impl ColumnDescriptor {
869 pub fn new(
871 primitive_type: TypePtr,
872 max_def_level: i16,
873 max_rep_level: i16,
874 path: ColumnPath,
875 ) -> Self {
876 Self {
877 primitive_type,
878 max_def_level,
879 max_rep_level,
880 path,
881 }
882 }
883
884 #[inline]
886 pub fn max_def_level(&self) -> i16 {
887 self.max_def_level
888 }
889
890 #[inline]
892 pub fn max_rep_level(&self) -> i16 {
893 self.max_rep_level
894 }
895
896 pub fn path(&self) -> &ColumnPath {
898 &self.path
899 }
900
901 pub fn self_type(&self) -> &Type {
903 self.primitive_type.as_ref()
904 }
905
906 pub fn self_type_ptr(&self) -> TypePtr {
909 self.primitive_type.clone()
910 }
911
912 pub fn name(&self) -> &str {
914 self.primitive_type.name()
915 }
916
917 pub fn converted_type(&self) -> ConvertedType {
919 self.primitive_type.get_basic_info().converted_type()
920 }
921
922 #[deprecated(
927 since = "57.1.0",
928 note = "use `ColumnDescriptor::logical_type_ref` instead (LogicalType cloning is non trivial)"
929 )]
930 pub fn logical_type(&self) -> Option<LogicalType> {
931 self.primitive_type
932 .get_basic_info()
933 .logical_type_ref()
934 .cloned()
935 }
936
937 pub fn logical_type_ref(&self) -> Option<&LogicalType> {
939 self.primitive_type.get_basic_info().logical_type_ref()
940 }
941
942 pub fn physical_type(&self) -> PhysicalType {
945 match self.primitive_type.as_ref() {
946 Type::PrimitiveType { physical_type, .. } => *physical_type,
947 _ => panic!("Expected primitive type!"),
948 }
949 }
950
951 pub fn type_length(&self) -> i32 {
954 match self.primitive_type.as_ref() {
955 Type::PrimitiveType { type_length, .. } => *type_length,
956 _ => panic!("Expected primitive type!"),
957 }
958 }
959
960 pub fn type_precision(&self) -> i32 {
963 match self.primitive_type.as_ref() {
964 Type::PrimitiveType { precision, .. } => *precision,
965 _ => panic!("Expected primitive type!"),
966 }
967 }
968
969 pub fn type_scale(&self) -> i32 {
972 match self.primitive_type.as_ref() {
973 Type::PrimitiveType { scale, .. } => *scale,
974 _ => panic!("Expected primitive type!"),
975 }
976 }
977
978 pub fn sort_order(&self) -> SortOrder {
980 ColumnOrder::sort_order_for_type(
981 self.logical_type_ref(),
982 self.converted_type(),
983 self.physical_type(),
984 )
985 }
986}
987
988#[derive(PartialEq, Clone)]
1019pub struct SchemaDescriptor {
1020 schema: TypePtr,
1025
1026 leaves: Vec<ColumnDescPtr>,
1030
1031 leaf_to_base: Vec<usize>,
1042}
1043
1044impl fmt::Debug for SchemaDescriptor {
1045 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046 f.debug_struct("SchemaDescriptor")
1048 .field("schema", &self.schema)
1049 .finish()
1050 }
1051}
1052
1053impl HeapSize for SchemaDescriptor {
1055 fn heap_size(&self) -> usize {
1056 self.schema.heap_size() + self.leaves.heap_size() + self.leaf_to_base.heap_size()
1057 }
1058}
1059
1060impl SchemaDescriptor {
1061 pub fn new(tp: TypePtr) -> Self {
1063 const INIT_SCHEMA_DEPTH: usize = 16;
1064 assert!(tp.is_group(), "SchemaDescriptor should take a GroupType");
1065 let n_leaves = num_leaves(&tp).unwrap();
1067 let mut leaves = Vec::with_capacity(n_leaves);
1068 let mut leaf_to_base = Vec::with_capacity(n_leaves);
1069 let mut path = Vec::with_capacity(INIT_SCHEMA_DEPTH);
1070 for (root_idx, f) in tp.get_fields().iter().enumerate() {
1071 path.clear();
1072 build_tree(f, root_idx, 0, 0, &mut leaves, &mut leaf_to_base, &mut path);
1073 }
1074
1075 Self {
1076 schema: tp,
1077 leaves,
1078 leaf_to_base,
1079 }
1080 }
1081
1082 pub fn column(&self, i: usize) -> ColumnDescPtr {
1084 assert!(
1085 i < self.leaves.len(),
1086 "Index out of bound: {} not in [0, {})",
1087 i,
1088 self.leaves.len()
1089 );
1090 self.leaves[i].clone()
1091 }
1092
1093 pub fn columns(&self) -> &[ColumnDescPtr] {
1095 &self.leaves
1096 }
1097
1098 pub fn num_columns(&self) -> usize {
1100 self.leaves.len()
1101 }
1102
1103 pub fn get_column_root(&self, i: usize) -> &Type {
1105 let result = self.column_root_of(i);
1106 result.as_ref()
1107 }
1108
1109 pub fn get_column_root_ptr(&self, i: usize) -> TypePtr {
1111 let result = self.column_root_of(i);
1112 result.clone()
1113 }
1114
1115 pub fn get_column_root_idx(&self, leaf: usize) -> usize {
1117 assert!(
1118 leaf < self.leaves.len(),
1119 "Index out of bound: {} not in [0, {})",
1120 leaf,
1121 self.leaves.len()
1122 );
1123
1124 *self
1125 .leaf_to_base
1126 .get(leaf)
1127 .unwrap_or_else(|| panic!("Expected a value for index {leaf} but found None"))
1128 }
1129
1130 fn column_root_of(&self, i: usize) -> &TypePtr {
1131 &self.schema.get_fields()[self.get_column_root_idx(i)]
1132 }
1133
1134 pub fn root_schema(&self) -> &Type {
1136 self.schema.as_ref()
1137 }
1138
1139 pub fn root_schema_ptr(&self) -> TypePtr {
1141 self.schema.clone()
1142 }
1143
1144 pub fn name(&self) -> &str {
1146 self.schema.name()
1147 }
1148}
1149
1150pub(crate) fn num_nodes(tp: &TypePtr) -> Result<usize> {
1152 if !tp.is_group() {
1153 return Err(general_err!("Root schema must be Group type"));
1154 }
1155 let mut n_nodes = 1usize; for f in tp.get_fields().iter() {
1157 count_nodes(f, &mut n_nodes);
1158 }
1159 Ok(n_nodes)
1160}
1161
1162pub(crate) fn count_nodes(tp: &TypePtr, n_nodes: &mut usize) {
1163 *n_nodes += 1;
1164 if let Type::GroupType { fields, .. } = tp.as_ref() {
1165 for f in fields {
1166 count_nodes(f, n_nodes);
1167 }
1168 }
1169}
1170
1171fn num_leaves(tp: &TypePtr) -> Result<usize> {
1173 if !tp.is_group() {
1174 return Err(general_err!("Root schema must be Group type"));
1175 }
1176 let mut n_leaves = 0usize;
1177 for f in tp.get_fields().iter() {
1178 count_leaves(f, &mut n_leaves);
1179 }
1180 Ok(n_leaves)
1181}
1182
1183fn count_leaves(tp: &TypePtr, n_leaves: &mut usize) {
1184 match tp.as_ref() {
1185 Type::PrimitiveType { .. } => *n_leaves += 1,
1186 Type::GroupType { fields, .. } => {
1187 for f in fields {
1188 count_leaves(f, n_leaves);
1189 }
1190 }
1191 }
1192}
1193
1194fn build_tree<'a>(
1195 tp: &'a TypePtr,
1196 root_idx: usize,
1197 mut max_rep_level: i16,
1198 mut max_def_level: i16,
1199 leaves: &mut Vec<ColumnDescPtr>,
1200 leaf_to_base: &mut Vec<usize>,
1201 path_so_far: &mut Vec<&'a str>,
1202) {
1203 assert!(tp.get_basic_info().has_repetition());
1204
1205 path_so_far.push(tp.name());
1206 match tp.get_basic_info().repetition() {
1207 Repetition::OPTIONAL => {
1208 max_def_level += 1;
1209 }
1210 Repetition::REPEATED => {
1211 max_def_level += 1;
1212 max_rep_level += 1;
1213 }
1214 _ => {}
1215 }
1216
1217 match tp.as_ref() {
1218 Type::PrimitiveType { .. } => {
1219 let mut path: Vec<String> = vec![];
1220 path.extend(path_so_far.iter().copied().map(String::from));
1221 leaves.push(Arc::new(ColumnDescriptor::new(
1222 tp.clone(),
1223 max_def_level,
1224 max_rep_level,
1225 ColumnPath::new(path),
1226 )));
1227 leaf_to_base.push(root_idx);
1228 }
1229 Type::GroupType { fields, .. } => {
1230 for f in fields {
1231 build_tree(
1232 f,
1233 root_idx,
1234 max_rep_level,
1235 max_def_level,
1236 leaves,
1237 leaf_to_base,
1238 path_so_far,
1239 );
1240 path_so_far.pop();
1241 }
1242 }
1243 }
1244}
1245
1246fn check_logical_type(logical_type: &Option<LogicalType>) -> Result<()> {
1248 if let Some(LogicalType::Integer { bit_width, .. }) = *logical_type {
1249 if bit_width != 8 && bit_width != 16 && bit_width != 32 && bit_width != 64 {
1250 return Err(general_err!(
1251 "Bit width must be 8, 16, 32, or 64 for Integer logical type"
1252 ));
1253 }
1254 }
1255 Ok(())
1256}
1257
1258pub(crate) fn parquet_schema_from_array<'a>(elements: Vec<SchemaElement<'a>>) -> Result<TypePtr> {
1261 let mut index = 0;
1262 let num_elements = elements.len();
1263 let mut schema_nodes = Vec::with_capacity(1); let mut elements = elements.into_iter();
1267
1268 while index < num_elements {
1269 let t = schema_from_array_helper(&mut elements, num_elements, index)?;
1270 index = t.0;
1271 schema_nodes.push(t.1);
1272 }
1273 if schema_nodes.len() != 1 {
1274 return Err(general_err!(
1275 "Expected exactly one root node, but found {}",
1276 schema_nodes.len()
1277 ));
1278 }
1279
1280 if !schema_nodes[0].is_group() {
1281 return Err(general_err!("Expected root node to be a group type"));
1282 }
1283
1284 Ok(schema_nodes.remove(0))
1285}
1286
1287fn schema_from_array_helper<'a>(
1289 elements: &mut IntoIter<SchemaElement<'a>>,
1290 num_elements: usize,
1291 index: usize,
1292) -> Result<(usize, TypePtr)> {
1293 let is_root_node = index == 0;
1296
1297 if index >= num_elements {
1298 return Err(general_err!(
1299 "Index out of bound, index = {}, len = {}",
1300 index,
1301 num_elements
1302 ));
1303 }
1304 let element = elements.next().expect("schema vector should not be empty");
1305
1306 if let (true, None | Some(0)) = (is_root_node, element.num_children) {
1308 let builder = Type::group_type_builder(element.name);
1309 return Ok((index + 1, Arc::new(builder.build().unwrap())));
1310 }
1311
1312 let converted_type = element.converted_type.unwrap_or(ConvertedType::NONE);
1313
1314 let logical_type = element.logical_type;
1316
1317 check_logical_type(&logical_type)?;
1318
1319 let field_id = element.field_id;
1320 match element.num_children {
1321 None | Some(0) => {
1327 if element.repetition_type.is_none() {
1329 return Err(general_err!(
1330 "Repetition level must be defined for a primitive type"
1331 ));
1332 }
1333 let repetition = element.repetition_type.unwrap();
1334 if let Some(physical_type) = element.r#type {
1335 let length = element.type_length.unwrap_or(-1);
1336 let scale = element.scale.unwrap_or(-1);
1337 let precision = element.precision.unwrap_or(-1);
1338 let name = element.name;
1339 let builder = Type::primitive_type_builder(name, physical_type)
1340 .with_repetition(repetition)
1341 .with_converted_type(converted_type)
1342 .with_logical_type(logical_type)
1343 .with_length(length)
1344 .with_precision(precision)
1345 .with_scale(scale)
1346 .with_id(field_id);
1347 Ok((index + 1, Arc::new(builder.build()?)))
1348 } else {
1349 let mut builder = Type::group_type_builder(element.name)
1350 .with_converted_type(converted_type)
1351 .with_logical_type(logical_type)
1352 .with_id(field_id);
1353 if !is_root_node {
1354 builder = builder.with_repetition(repetition);
1362 }
1363 Ok((index + 1, Arc::new(builder.build().unwrap())))
1364 }
1365 }
1366 Some(n) => {
1367 let repetition = element.repetition_type;
1368
1369 let mut fields = Vec::with_capacity(n as usize);
1370 let mut next_index = index + 1;
1371 for _ in 0..n {
1372 let child_result = schema_from_array_helper(elements, num_elements, next_index)?;
1373 next_index = child_result.0;
1374 fields.push(child_result.1);
1375 }
1376
1377 let mut builder = Type::group_type_builder(element.name)
1378 .with_converted_type(converted_type)
1379 .with_logical_type(logical_type)
1380 .with_fields(fields)
1381 .with_id(field_id);
1382
1383 if !is_root_node {
1391 let Some(rep) = repetition else {
1392 return Err(general_err!(
1393 "Repetition level must be defined for non-root types"
1394 ));
1395 };
1396 builder = builder.with_repetition(rep);
1397 }
1398 Ok((next_index, Arc::new(builder.build()?)))
1399 }
1400 }
1401}
1402
1403#[cfg(test)]
1404mod tests {
1405 use super::*;
1406
1407 use crate::{
1408 file::metadata::thrift::tests::{buf_to_schema_list, roundtrip_schema, schema_to_buf},
1409 schema::parser::parse_message_type,
1410 };
1411
1412 #[test]
1415 fn test_primitive_type() {
1416 let mut result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1417 .with_logical_type(Some(LogicalType::Integer {
1418 bit_width: 32,
1419 is_signed: true,
1420 }))
1421 .with_id(Some(0))
1422 .build();
1423 assert!(result.is_ok());
1424
1425 if let Ok(tp) = result {
1426 assert!(tp.is_primitive());
1427 assert!(!tp.is_group());
1428 let basic_info = tp.get_basic_info();
1429 assert_eq!(basic_info.repetition(), Repetition::OPTIONAL);
1430 assert_eq!(
1431 basic_info.logical_type_ref(),
1432 Some(&LogicalType::Integer {
1433 bit_width: 32,
1434 is_signed: true
1435 })
1436 );
1437 assert_eq!(basic_info.converted_type(), ConvertedType::INT_32);
1438 assert_eq!(basic_info.id(), 0);
1439 match tp {
1440 Type::PrimitiveType { physical_type, .. } => {
1441 assert_eq!(physical_type, PhysicalType::INT32);
1442 }
1443 _ => panic!(),
1444 }
1445 }
1446
1447 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1449 .with_repetition(Repetition::REPEATED)
1450 .with_logical_type(Some(LogicalType::Integer {
1451 is_signed: true,
1452 bit_width: 8,
1453 }))
1454 .build();
1455 assert!(result.is_err());
1456 if let Err(e) = result {
1457 assert_eq!(
1458 format!("{e}"),
1459 "Parquet error: Cannot annotate Integer { bit_width: 8, is_signed: true } from INT64 for field 'foo'"
1460 );
1461 }
1462
1463 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1465 .with_repetition(Repetition::REPEATED)
1466 .with_converted_type(ConvertedType::BSON)
1467 .build();
1468 assert!(result.is_err());
1469 if let Err(e) = result {
1470 assert_eq!(
1471 format!("{e}"),
1472 "Parquet error: BSON cannot annotate field 'foo' because it is not a BYTE_ARRAY field"
1473 );
1474 }
1475
1476 result = Type::primitive_type_builder("foo", PhysicalType::INT96)
1477 .with_repetition(Repetition::REQUIRED)
1478 .with_converted_type(ConvertedType::DECIMAL)
1479 .with_precision(-1)
1480 .with_scale(-1)
1481 .build();
1482 assert!(result.is_err());
1483 if let Err(e) = result {
1484 assert_eq!(
1485 format!("{e}"),
1486 "Parquet error: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
1487 );
1488 }
1489
1490 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1491 .with_repetition(Repetition::REQUIRED)
1492 .with_logical_type(Some(LogicalType::Decimal {
1493 scale: 32,
1494 precision: 12,
1495 }))
1496 .with_precision(-1)
1497 .with_scale(-1)
1498 .build();
1499 assert!(result.is_err());
1500 if let Err(e) = result {
1501 assert_eq!(
1502 format!("{e}"),
1503 "Parquet error: DECIMAL logical type scale 32 must match self.scale -1 for field 'foo'"
1504 );
1505 }
1506
1507 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1508 .with_repetition(Repetition::REQUIRED)
1509 .with_converted_type(ConvertedType::DECIMAL)
1510 .with_precision(-1)
1511 .with_scale(-1)
1512 .build();
1513 assert!(result.is_err());
1514 if let Err(e) = result {
1515 assert_eq!(
1516 format!("{e}"),
1517 "Parquet error: Invalid DECIMAL precision: -1"
1518 );
1519 }
1520
1521 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1522 .with_repetition(Repetition::REQUIRED)
1523 .with_converted_type(ConvertedType::DECIMAL)
1524 .with_precision(0)
1525 .with_scale(-1)
1526 .build();
1527 assert!(result.is_err());
1528 if let Err(e) = result {
1529 assert_eq!(
1530 format!("{e}"),
1531 "Parquet error: Invalid DECIMAL precision: 0"
1532 );
1533 }
1534
1535 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1536 .with_repetition(Repetition::REQUIRED)
1537 .with_converted_type(ConvertedType::DECIMAL)
1538 .with_precision(1)
1539 .with_scale(-1)
1540 .build();
1541 assert!(result.is_err());
1542 if let Err(e) = result {
1543 assert_eq!(format!("{e}"), "Parquet error: Invalid DECIMAL scale: -1");
1544 }
1545
1546 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1547 .with_repetition(Repetition::REQUIRED)
1548 .with_converted_type(ConvertedType::DECIMAL)
1549 .with_precision(1)
1550 .with_scale(2)
1551 .build();
1552 assert!(result.is_err());
1553 if let Err(e) = result {
1554 assert_eq!(
1555 format!("{e}"),
1556 "Parquet error: Invalid DECIMAL: scale (2) cannot be greater than precision (1)"
1557 );
1558 }
1559
1560 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1562 .with_repetition(Repetition::REQUIRED)
1563 .with_converted_type(ConvertedType::DECIMAL)
1564 .with_precision(1)
1565 .with_scale(1)
1566 .build();
1567 assert!(result.is_ok());
1568
1569 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1570 .with_repetition(Repetition::REQUIRED)
1571 .with_converted_type(ConvertedType::DECIMAL)
1572 .with_precision(18)
1573 .with_scale(2)
1574 .build();
1575 assert!(result.is_err());
1576 if let Err(e) = result {
1577 assert_eq!(
1578 format!("{e}"),
1579 "Parquet error: Cannot represent INT32 as DECIMAL with precision 18"
1580 );
1581 }
1582
1583 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1584 .with_repetition(Repetition::REQUIRED)
1585 .with_converted_type(ConvertedType::DECIMAL)
1586 .with_precision(32)
1587 .with_scale(2)
1588 .build();
1589 assert!(result.is_err());
1590 if let Err(e) = result {
1591 assert_eq!(
1592 format!("{e}"),
1593 "Parquet error: Cannot represent INT64 as DECIMAL with precision 32"
1594 );
1595 }
1596
1597 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1598 .with_repetition(Repetition::REQUIRED)
1599 .with_converted_type(ConvertedType::DECIMAL)
1600 .with_length(5)
1601 .with_precision(12)
1602 .with_scale(2)
1603 .build();
1604 assert!(result.is_err());
1605 if let Err(e) = result {
1606 assert_eq!(
1607 format!("{e}"),
1608 "Parquet error: Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length 5 and precision 12. The max precision can only be 11"
1609 );
1610 }
1611
1612 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1613 .with_repetition(Repetition::REQUIRED)
1614 .with_converted_type(ConvertedType::UINT_8)
1615 .build();
1616 assert!(result.is_err());
1617 if let Err(e) = result {
1618 assert_eq!(
1619 format!("{e}"),
1620 "Parquet error: UINT_8 cannot annotate field 'foo' because it is not a INT32 field"
1621 );
1622 }
1623
1624 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1625 .with_repetition(Repetition::REQUIRED)
1626 .with_converted_type(ConvertedType::TIME_MICROS)
1627 .build();
1628 assert!(result.is_err());
1629 if let Err(e) = result {
1630 assert_eq!(
1631 format!("{e}"),
1632 "Parquet error: TIME_MICROS cannot annotate field 'foo' because it is not a INT64 field"
1633 );
1634 }
1635
1636 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1637 .with_repetition(Repetition::REQUIRED)
1638 .with_converted_type(ConvertedType::INTERVAL)
1639 .build();
1640 assert!(result.is_err());
1641 if let Err(e) = result {
1642 assert_eq!(
1643 format!("{e}"),
1644 "Parquet error: INTERVAL cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(12) field"
1645 );
1646 }
1647
1648 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1649 .with_repetition(Repetition::REQUIRED)
1650 .with_converted_type(ConvertedType::INTERVAL)
1651 .with_length(1)
1652 .build();
1653 assert!(result.is_err());
1654 if let Err(e) = result {
1655 assert_eq!(
1656 format!("{e}"),
1657 "Parquet error: INTERVAL cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(12) field"
1658 );
1659 }
1660
1661 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1662 .with_repetition(Repetition::REQUIRED)
1663 .with_converted_type(ConvertedType::ENUM)
1664 .build();
1665 assert!(result.is_err());
1666 if let Err(e) = result {
1667 assert_eq!(
1668 format!("{e}"),
1669 "Parquet error: ENUM cannot annotate field 'foo' because it is not a BYTE_ARRAY field"
1670 );
1671 }
1672
1673 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1674 .with_repetition(Repetition::REQUIRED)
1675 .with_converted_type(ConvertedType::MAP)
1676 .build();
1677 assert!(result.is_err());
1678 if let Err(e) = result {
1679 assert_eq!(
1680 format!("{e}"),
1681 "Parquet error: MAP cannot be applied to primitive field 'foo'"
1682 );
1683 }
1684
1685 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1686 .with_repetition(Repetition::REQUIRED)
1687 .with_converted_type(ConvertedType::DECIMAL)
1688 .with_length(-1)
1689 .build();
1690 assert!(result.is_err());
1691 if let Err(e) = result {
1692 assert_eq!(
1693 format!("{e}"),
1694 "Parquet error: Invalid FIXED_LEN_BYTE_ARRAY length: -1 for field 'foo'"
1695 );
1696 }
1697
1698 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1699 .with_repetition(Repetition::REQUIRED)
1700 .with_logical_type(Some(LogicalType::Float16))
1701 .with_length(2)
1702 .build();
1703 assert!(result.is_ok());
1704
1705 result = Type::primitive_type_builder("foo", PhysicalType::FLOAT)
1707 .with_repetition(Repetition::REQUIRED)
1708 .with_logical_type(Some(LogicalType::Float16))
1709 .with_length(2)
1710 .build();
1711 assert!(result.is_err());
1712 if let Err(e) = result {
1713 assert_eq!(
1714 format!("{e}"),
1715 "Parquet error: Cannot annotate Float16 from FLOAT for field 'foo'"
1716 );
1717 }
1718
1719 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1721 .with_repetition(Repetition::REQUIRED)
1722 .with_logical_type(Some(LogicalType::Float16))
1723 .with_length(4)
1724 .build();
1725 assert!(result.is_err());
1726 if let Err(e) = result {
1727 assert_eq!(
1728 format!("{e}"),
1729 "Parquet error: FLOAT16 cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(2) field"
1730 );
1731 }
1732
1733 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1735 .with_repetition(Repetition::REQUIRED)
1736 .with_logical_type(Some(LogicalType::Uuid))
1737 .with_length(15)
1738 .build();
1739 assert!(result.is_err());
1740 if let Err(e) = result {
1741 assert_eq!(
1742 format!("{e}"),
1743 "Parquet error: UUID cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(16) field"
1744 );
1745 }
1746
1747 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1749 .with_logical_type(Some(LogicalType::_Unknown { field_id: 100 }))
1750 .build();
1751 assert!(result.is_ok());
1752 }
1753
1754 #[test]
1755 fn test_group_type() {
1756 let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
1757 .with_converted_type(ConvertedType::INT_32)
1758 .with_id(Some(0))
1759 .build();
1760 assert!(f1.is_ok());
1761 let f2 = Type::primitive_type_builder("f2", PhysicalType::BYTE_ARRAY)
1762 .with_converted_type(ConvertedType::UTF8)
1763 .with_id(Some(1))
1764 .build();
1765 assert!(f2.is_ok());
1766
1767 let fields = vec![Arc::new(f1.unwrap()), Arc::new(f2.unwrap())];
1768
1769 let result = Type::group_type_builder("foo")
1770 .with_repetition(Repetition::REPEATED)
1771 .with_logical_type(Some(LogicalType::List))
1772 .with_fields(fields)
1773 .with_id(Some(1))
1774 .build();
1775 assert!(result.is_ok());
1776
1777 let tp = result.unwrap();
1778 let basic_info = tp.get_basic_info();
1779 assert!(tp.is_group());
1780 assert!(!tp.is_primitive());
1781 assert_eq!(basic_info.repetition(), Repetition::REPEATED);
1782 assert_eq!(basic_info.logical_type_ref(), Some(&LogicalType::List));
1783 assert_eq!(basic_info.converted_type(), ConvertedType::LIST);
1784 assert_eq!(basic_info.id(), 1);
1785 assert_eq!(tp.get_fields().len(), 2);
1786 assert_eq!(tp.get_fields()[0].name(), "f1");
1787 assert_eq!(tp.get_fields()[1].name(), "f2");
1788 }
1789
1790 #[test]
1791 fn test_column_descriptor() {
1792 let result = test_column_descriptor_helper();
1793 assert!(
1794 result.is_ok(),
1795 "Expected result to be OK but got err:\n {}",
1796 result.unwrap_err()
1797 );
1798 }
1799
1800 fn test_column_descriptor_helper() -> Result<()> {
1801 let tp = Type::primitive_type_builder("name", PhysicalType::BYTE_ARRAY)
1802 .with_converted_type(ConvertedType::UTF8)
1803 .build()?;
1804
1805 let descr = ColumnDescriptor::new(Arc::new(tp), 4, 1, ColumnPath::from("name"));
1806
1807 assert_eq!(descr.path(), &ColumnPath::from("name"));
1808 assert_eq!(descr.converted_type(), ConvertedType::UTF8);
1809 assert_eq!(descr.physical_type(), PhysicalType::BYTE_ARRAY);
1810 assert_eq!(descr.max_def_level(), 4);
1811 assert_eq!(descr.max_rep_level(), 1);
1812 assert_eq!(descr.name(), "name");
1813 assert_eq!(descr.type_length(), -1);
1814 assert_eq!(descr.type_precision(), -1);
1815 assert_eq!(descr.type_scale(), -1);
1816
1817 Ok(())
1818 }
1819
1820 #[test]
1821 fn test_schema_descriptor() {
1822 let result = test_schema_descriptor_helper();
1823 assert!(
1824 result.is_ok(),
1825 "Expected result to be OK but got err:\n {}",
1826 result.unwrap_err()
1827 );
1828 }
1829
1830 fn test_schema_descriptor_helper() -> Result<()> {
1832 let mut fields = vec![];
1833
1834 let inta = Type::primitive_type_builder("a", PhysicalType::INT32)
1835 .with_repetition(Repetition::REQUIRED)
1836 .with_converted_type(ConvertedType::INT_32)
1837 .build()?;
1838 fields.push(Arc::new(inta));
1839 let intb = Type::primitive_type_builder("b", PhysicalType::INT64)
1840 .with_converted_type(ConvertedType::INT_64)
1841 .build()?;
1842 fields.push(Arc::new(intb));
1843 let intc = Type::primitive_type_builder("c", PhysicalType::BYTE_ARRAY)
1844 .with_repetition(Repetition::REPEATED)
1845 .with_converted_type(ConvertedType::UTF8)
1846 .build()?;
1847 fields.push(Arc::new(intc));
1848
1849 let item1 = Type::primitive_type_builder("item1", PhysicalType::INT64)
1851 .with_repetition(Repetition::REQUIRED)
1852 .with_converted_type(ConvertedType::INT_64)
1853 .build()?;
1854 let item2 = Type::primitive_type_builder("item2", PhysicalType::BOOLEAN).build()?;
1855 let item3 = Type::primitive_type_builder("item3", PhysicalType::INT32)
1856 .with_repetition(Repetition::REPEATED)
1857 .with_converted_type(ConvertedType::INT_32)
1858 .build()?;
1859 let list = Type::group_type_builder("records")
1860 .with_repetition(Repetition::REPEATED)
1861 .with_converted_type(ConvertedType::LIST)
1862 .with_fields(vec![Arc::new(item1), Arc::new(item2), Arc::new(item3)])
1863 .build()?;
1864 let bag = Type::group_type_builder("bag")
1865 .with_repetition(Repetition::OPTIONAL)
1866 .with_fields(vec![Arc::new(list)])
1867 .build()?;
1868 fields.push(Arc::new(bag));
1869
1870 let schema = Type::group_type_builder("schema")
1871 .with_repetition(Repetition::REPEATED)
1872 .with_fields(fields)
1873 .build()?;
1874 let descr = SchemaDescriptor::new(Arc::new(schema));
1875
1876 let nleaves = 6;
1877 assert_eq!(descr.num_columns(), nleaves);
1878
1879 let ex_max_def_levels = [0, 1, 1, 2, 3, 3];
1889 let ex_max_rep_levels = [0, 0, 1, 1, 1, 2];
1890
1891 for i in 0..nleaves {
1892 let col = descr.column(i);
1893 assert_eq!(col.max_def_level(), ex_max_def_levels[i], "{i}");
1894 assert_eq!(col.max_rep_level(), ex_max_rep_levels[i], "{i}");
1895 }
1896
1897 assert_eq!(descr.column(0).path().string(), "a");
1898 assert_eq!(descr.column(1).path().string(), "b");
1899 assert_eq!(descr.column(2).path().string(), "c");
1900 assert_eq!(descr.column(3).path().string(), "bag.records.item1");
1901 assert_eq!(descr.column(4).path().string(), "bag.records.item2");
1902 assert_eq!(descr.column(5).path().string(), "bag.records.item3");
1903
1904 assert_eq!(descr.get_column_root(0).name(), "a");
1905 assert_eq!(descr.get_column_root(3).name(), "bag");
1906 assert_eq!(descr.get_column_root(4).name(), "bag");
1907 assert_eq!(descr.get_column_root(5).name(), "bag");
1908
1909 Ok(())
1910 }
1911
1912 #[test]
1913 fn test_schema_build_tree_def_rep_levels() {
1914 let message_type = "
1915 message spark_schema {
1916 REQUIRED INT32 a;
1917 OPTIONAL group b {
1918 OPTIONAL INT32 _1;
1919 OPTIONAL INT32 _2;
1920 }
1921 OPTIONAL group c (LIST) {
1922 REPEATED group list {
1923 OPTIONAL INT32 element;
1924 }
1925 }
1926 }
1927 ";
1928 let schema = parse_message_type(message_type).expect("should parse schema");
1929 let descr = SchemaDescriptor::new(Arc::new(schema));
1930 assert_eq!(descr.column(0).max_def_level(), 0);
1932 assert_eq!(descr.column(0).max_rep_level(), 0);
1933 assert_eq!(descr.column(1).max_def_level(), 2);
1935 assert_eq!(descr.column(1).max_rep_level(), 0);
1936 assert_eq!(descr.column(2).max_def_level(), 2);
1938 assert_eq!(descr.column(2).max_rep_level(), 0);
1939 assert_eq!(descr.column(3).max_def_level(), 3);
1941 assert_eq!(descr.column(3).max_rep_level(), 1);
1942 }
1943
1944 #[test]
1945 #[should_panic(expected = "Cannot call get_physical_type() on a non-primitive type")]
1946 fn test_get_physical_type_panic() {
1947 let list = Type::group_type_builder("records")
1948 .with_repetition(Repetition::REPEATED)
1949 .build()
1950 .unwrap();
1951 list.get_physical_type();
1952 }
1953
1954 #[test]
1955 fn test_get_physical_type_primitive() {
1956 let f = Type::primitive_type_builder("f", PhysicalType::INT64)
1957 .build()
1958 .unwrap();
1959 assert_eq!(f.get_physical_type(), PhysicalType::INT64);
1960
1961 let f = Type::primitive_type_builder("f", PhysicalType::BYTE_ARRAY)
1962 .build()
1963 .unwrap();
1964 assert_eq!(f.get_physical_type(), PhysicalType::BYTE_ARRAY);
1965 }
1966
1967 #[test]
1968 fn test_check_contains_primitive_primitive() {
1969 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1971 .build()
1972 .unwrap();
1973 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
1974 .build()
1975 .unwrap();
1976 assert!(f1.check_contains(&f2));
1977
1978 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1980 .with_converted_type(ConvertedType::UINT_8)
1981 .build()
1982 .unwrap();
1983 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
1984 .with_converted_type(ConvertedType::UINT_16)
1985 .build()
1986 .unwrap();
1987 assert!(f1.check_contains(&f2));
1988
1989 let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
1991 .build()
1992 .unwrap();
1993 let f2 = Type::primitive_type_builder("f2", PhysicalType::INT32)
1994 .build()
1995 .unwrap();
1996 assert!(!f1.check_contains(&f2));
1997
1998 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
2000 .build()
2001 .unwrap();
2002 let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
2003 .build()
2004 .unwrap();
2005 assert!(!f1.check_contains(&f2));
2006
2007 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
2009 .with_repetition(Repetition::REQUIRED)
2010 .build()
2011 .unwrap();
2012 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
2013 .with_repetition(Repetition::OPTIONAL)
2014 .build()
2015 .unwrap();
2016 assert!(!f1.check_contains(&f2));
2017 }
2018
2019 fn test_new_group_type(name: &str, repetition: Repetition, types: Vec<Type>) -> Type {
2021 Type::group_type_builder(name)
2022 .with_repetition(repetition)
2023 .with_fields(types.into_iter().map(Arc::new).collect())
2024 .build()
2025 .unwrap()
2026 }
2027
2028 #[test]
2029 fn test_check_contains_group_group() {
2030 let f1 = Type::group_type_builder("f").build().unwrap();
2032 let f2 = Type::group_type_builder("f").build().unwrap();
2033 assert!(f1.check_contains(&f2));
2034 assert!(!f1.is_optional());
2035
2036 let f1 = test_new_group_type(
2038 "f",
2039 Repetition::REPEATED,
2040 vec![
2041 Type::primitive_type_builder("f1", PhysicalType::INT32)
2042 .build()
2043 .unwrap(),
2044 Type::primitive_type_builder("f2", PhysicalType::INT64)
2045 .build()
2046 .unwrap(),
2047 ],
2048 );
2049 let f2 = test_new_group_type(
2050 "f",
2051 Repetition::REPEATED,
2052 vec![
2053 Type::primitive_type_builder("f1", PhysicalType::INT32)
2054 .build()
2055 .unwrap(),
2056 Type::primitive_type_builder("f2", PhysicalType::INT64)
2057 .build()
2058 .unwrap(),
2059 ],
2060 );
2061 assert!(f1.check_contains(&f2));
2062
2063 let f1 = test_new_group_type(
2065 "f",
2066 Repetition::REPEATED,
2067 vec![
2068 Type::primitive_type_builder("f1", PhysicalType::INT32)
2069 .build()
2070 .unwrap(),
2071 Type::primitive_type_builder("f2", PhysicalType::INT64)
2072 .build()
2073 .unwrap(),
2074 ],
2075 );
2076 let f2 = test_new_group_type(
2077 "f",
2078 Repetition::REPEATED,
2079 vec![
2080 Type::primitive_type_builder("f2", PhysicalType::INT64)
2081 .build()
2082 .unwrap(),
2083 ],
2084 );
2085 assert!(f1.check_contains(&f2));
2086
2087 let f1 = Type::group_type_builder("f1").build().unwrap();
2089 let f2 = Type::group_type_builder("f2").build().unwrap();
2090 assert!(!f1.check_contains(&f2));
2091
2092 let f1 = Type::group_type_builder("f")
2094 .with_repetition(Repetition::OPTIONAL)
2095 .build()
2096 .unwrap();
2097 let f2 = Type::group_type_builder("f")
2098 .with_repetition(Repetition::REPEATED)
2099 .build()
2100 .unwrap();
2101 assert!(!f1.check_contains(&f2));
2102
2103 let f1 = test_new_group_type(
2105 "f",
2106 Repetition::REPEATED,
2107 vec![
2108 Type::primitive_type_builder("f1", PhysicalType::INT32)
2109 .build()
2110 .unwrap(),
2111 Type::primitive_type_builder("f2", PhysicalType::INT64)
2112 .build()
2113 .unwrap(),
2114 ],
2115 );
2116 let f2 = test_new_group_type(
2117 "f",
2118 Repetition::REPEATED,
2119 vec![
2120 Type::primitive_type_builder("f1", PhysicalType::INT32)
2121 .build()
2122 .unwrap(),
2123 Type::primitive_type_builder("f2", PhysicalType::BOOLEAN)
2124 .build()
2125 .unwrap(),
2126 ],
2127 );
2128 assert!(!f1.check_contains(&f2));
2129
2130 let f1 = test_new_group_type(
2132 "f",
2133 Repetition::REPEATED,
2134 vec![
2135 Type::primitive_type_builder("f1", PhysicalType::INT32)
2136 .build()
2137 .unwrap(),
2138 Type::primitive_type_builder("f2", PhysicalType::INT64)
2139 .build()
2140 .unwrap(),
2141 ],
2142 );
2143 let f2 = test_new_group_type(
2144 "f",
2145 Repetition::REPEATED,
2146 vec![
2147 Type::primitive_type_builder("f3", PhysicalType::INT32)
2148 .build()
2149 .unwrap(),
2150 ],
2151 );
2152 assert!(!f1.check_contains(&f2));
2153 }
2154
2155 #[test]
2156 fn test_check_contains_group_primitive() {
2157 let f1 = Type::group_type_builder("f").build().unwrap();
2159 let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
2160 .build()
2161 .unwrap();
2162 assert!(!f1.check_contains(&f2));
2163 assert!(!f2.check_contains(&f1));
2164
2165 let f1 = test_new_group_type(
2167 "f",
2168 Repetition::REPEATED,
2169 vec![
2170 Type::primitive_type_builder("f1", PhysicalType::INT32)
2171 .build()
2172 .unwrap(),
2173 ],
2174 );
2175 let f2 = Type::primitive_type_builder("f1", PhysicalType::INT32)
2176 .build()
2177 .unwrap();
2178 assert!(!f1.check_contains(&f2));
2179 assert!(!f2.check_contains(&f1));
2180
2181 let f1 = test_new_group_type(
2183 "a",
2184 Repetition::REPEATED,
2185 vec![
2186 test_new_group_type(
2187 "b",
2188 Repetition::REPEATED,
2189 vec![
2190 Type::primitive_type_builder("c", PhysicalType::INT32)
2191 .build()
2192 .unwrap(),
2193 ],
2194 ),
2195 Type::primitive_type_builder("d", PhysicalType::INT64)
2196 .build()
2197 .unwrap(),
2198 Type::primitive_type_builder("e", PhysicalType::BOOLEAN)
2199 .build()
2200 .unwrap(),
2201 ],
2202 );
2203 let f2 = test_new_group_type(
2204 "a",
2205 Repetition::REPEATED,
2206 vec![test_new_group_type(
2207 "b",
2208 Repetition::REPEATED,
2209 vec![
2210 Type::primitive_type_builder("c", PhysicalType::INT32)
2211 .build()
2212 .unwrap(),
2213 ],
2214 )],
2215 );
2216 assert!(f1.check_contains(&f2)); assert!(!f2.check_contains(&f1)); }
2219
2220 #[test]
2221 fn test_schema_type_thrift_conversion_err() {
2222 let schema = Type::primitive_type_builder("col", PhysicalType::INT32)
2223 .build()
2224 .unwrap();
2225 let schema = Arc::new(schema);
2226 let thrift_schema = schema_to_buf(&schema);
2227 assert!(thrift_schema.is_err());
2228 if let Err(e) = thrift_schema {
2229 assert_eq!(
2230 format!("{e}"),
2231 "Parquet error: Root schema must be Group type"
2232 );
2233 }
2234 }
2235
2236 #[test]
2237 fn test_schema_type_thrift_conversion() {
2238 let message_type = "
2239 message conversions {
2240 REQUIRED INT64 id;
2241 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) f16 (FLOAT16);
2242 OPTIONAL group int_array_Array (LIST) {
2243 REPEATED group list {
2244 OPTIONAL group element (LIST) {
2245 REPEATED group list {
2246 OPTIONAL INT32 element;
2247 }
2248 }
2249 }
2250 }
2251 OPTIONAL group int_map (MAP) {
2252 REPEATED group map (MAP_KEY_VALUE) {
2253 REQUIRED BYTE_ARRAY key (UTF8);
2254 OPTIONAL INT32 value;
2255 }
2256 }
2257 OPTIONAL group int_Map_Array (LIST) {
2258 REPEATED group list {
2259 OPTIONAL group g (MAP) {
2260 REPEATED group map (MAP_KEY_VALUE) {
2261 REQUIRED BYTE_ARRAY key (UTF8);
2262 OPTIONAL group value {
2263 OPTIONAL group H {
2264 OPTIONAL group i (LIST) {
2265 REPEATED group list {
2266 OPTIONAL DOUBLE element;
2267 }
2268 }
2269 }
2270 }
2271 }
2272 }
2273 }
2274 }
2275 OPTIONAL group nested_struct {
2276 OPTIONAL INT32 A;
2277 OPTIONAL group b (LIST) {
2278 REPEATED group list {
2279 REQUIRED FIXED_LEN_BYTE_ARRAY (16) element;
2280 }
2281 }
2282 }
2283 }
2284 ";
2285 let expected_schema = parse_message_type(message_type).unwrap();
2286 let result_schema = roundtrip_schema(Arc::new(expected_schema.clone())).unwrap();
2287 assert_eq!(result_schema, Arc::new(expected_schema));
2288 }
2289
2290 #[test]
2291 fn test_schema_type_thrift_conversion_decimal() {
2292 let message_type = "
2293 message decimals {
2294 OPTIONAL INT32 field0;
2295 OPTIONAL INT64 field1 (DECIMAL (18, 2));
2296 OPTIONAL FIXED_LEN_BYTE_ARRAY (16) field2 (DECIMAL (38, 18));
2297 OPTIONAL BYTE_ARRAY field3 (DECIMAL (9));
2298 }
2299 ";
2300 let expected_schema = parse_message_type(message_type).unwrap();
2301 let result_schema = roundtrip_schema(Arc::new(expected_schema.clone())).unwrap();
2302 assert_eq!(result_schema, Arc::new(expected_schema));
2303 }
2304
2305 #[test]
2308 fn test_schema_from_thrift_with_num_children_set() {
2309 let message_type = "
2311 message schema {
2312 OPTIONAL BYTE_ARRAY id (UTF8);
2313 OPTIONAL BYTE_ARRAY name (UTF8);
2314 OPTIONAL BYTE_ARRAY message (UTF8);
2315 OPTIONAL INT32 type (UINT_8);
2316 OPTIONAL INT64 author_time (TIMESTAMP_MILLIS);
2317 OPTIONAL INT64 __index_level_0__;
2318 }
2319 ";
2320
2321 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2322 let mut buf = schema_to_buf(&expected_schema).unwrap();
2323 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2324
2325 for elem in &mut thrift_schema[..] {
2327 if elem.num_children.is_none() {
2328 elem.num_children = Some(0);
2329 }
2330 }
2331
2332 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2333 assert_eq!(result_schema, expected_schema);
2334 }
2335
2336 #[test]
2339 fn test_schema_from_thrift_root_has_repetition() {
2340 let message_type = "
2342 message schema {
2343 OPTIONAL BYTE_ARRAY a (UTF8);
2344 OPTIONAL INT32 b (UINT_8);
2345 }
2346 ";
2347
2348 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2349 let mut buf = schema_to_buf(&expected_schema).unwrap();
2350 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2351 thrift_schema[0].repetition_type = Some(Repetition::REQUIRED);
2352
2353 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2354 assert_eq!(result_schema, expected_schema);
2355 }
2356
2357 #[test]
2358 fn test_schema_from_thrift_group_has_no_child() {
2359 let message_type = "message schema {}";
2360
2361 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2362 let mut buf = schema_to_buf(&expected_schema).unwrap();
2363 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2364 thrift_schema[0].repetition_type = Some(Repetition::REQUIRED);
2365
2366 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2367 assert_eq!(result_schema, expected_schema);
2368 }
2369}