1use std::vec::IntoIter;
21use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::file::metadata::thrift_gen::SchemaElement;
24use crate::file::metadata::HeapSize;
25
26use crate::basic::{
27 ColumnOrder, ConvertedType, LogicalType, Repetition, SortOrder, TimeUnit, Type as PhysicalType,
28};
29use crate::errors::{ParquetError, Result};
30
31pub type TypePtr = Arc<Type>;
36pub type SchemaDescPtr = Arc<SchemaDescriptor>;
38pub type ColumnDescPtr = Arc<ColumnDescriptor>;
40
41#[derive(Clone, Debug, PartialEq)]
48pub enum Type {
49 PrimitiveType {
51 basic_info: BasicTypeInfo,
53 physical_type: PhysicalType,
55 type_length: i32,
57 scale: i32,
59 precision: i32,
61 },
62 GroupType {
64 basic_info: BasicTypeInfo,
66 fields: Vec<TypePtr>,
68 },
69}
70
71impl HeapSize for Type {
72 fn heap_size(&self) -> usize {
73 match self {
74 Type::PrimitiveType { basic_info, .. } => basic_info.heap_size(),
75 Type::GroupType { basic_info, fields } => basic_info.heap_size() + fields.heap_size(),
76 }
77 }
78}
79
80impl Type {
81 pub fn primitive_type_builder(
83 name: &str,
84 physical_type: PhysicalType,
85 ) -> PrimitiveTypeBuilder<'_> {
86 PrimitiveTypeBuilder::new(name, physical_type)
87 }
88
89 pub fn group_type_builder(name: &str) -> GroupTypeBuilder<'_> {
91 GroupTypeBuilder::new(name)
92 }
93
94 pub fn get_basic_info(&self) -> &BasicTypeInfo {
96 match *self {
97 Type::PrimitiveType { ref basic_info, .. } => basic_info,
98 Type::GroupType { ref basic_info, .. } => basic_info,
99 }
100 }
101
102 pub fn name(&self) -> &str {
104 self.get_basic_info().name()
105 }
106
107 pub fn get_fields(&self) -> &[TypePtr] {
111 match *self {
112 Type::GroupType { ref fields, .. } => &fields[..],
113 _ => panic!("Cannot call get_fields() on a non-group type"),
114 }
115 }
116
117 pub fn get_physical_type(&self) -> PhysicalType {
120 match *self {
121 Type::PrimitiveType {
122 basic_info: _,
123 physical_type,
124 ..
125 } => physical_type,
126 _ => panic!("Cannot call get_physical_type() on a non-primitive type"),
127 }
128 }
129
130 pub fn get_precision(&self) -> i32 {
133 match *self {
134 Type::PrimitiveType { precision, .. } => precision,
135 _ => panic!("Cannot call get_precision() on non-primitive type"),
136 }
137 }
138
139 pub fn get_scale(&self) -> i32 {
142 match *self {
143 Type::PrimitiveType { scale, .. } => scale,
144 _ => panic!("Cannot call get_scale() on non-primitive type"),
145 }
146 }
147
148 pub fn check_contains(&self, sub_type: &Type) -> bool {
151 let basic_match = self.get_basic_info().name() == sub_type.get_basic_info().name()
153 && (self.is_schema() && sub_type.is_schema()
154 || !self.is_schema()
155 && !sub_type.is_schema()
156 && self.get_basic_info().repetition()
157 == sub_type.get_basic_info().repetition());
158
159 match *self {
160 Type::PrimitiveType { .. } if basic_match && sub_type.is_primitive() => {
161 self.get_physical_type() == sub_type.get_physical_type()
162 }
163 Type::GroupType { .. } if basic_match && sub_type.is_group() => {
164 let mut field_map = HashMap::new();
166 for field in self.get_fields() {
167 field_map.insert(field.name(), field);
168 }
169
170 for field in sub_type.get_fields() {
171 if !field_map
172 .get(field.name())
173 .map(|tpe| tpe.check_contains(field))
174 .unwrap_or(false)
175 {
176 return false;
177 }
178 }
179 true
180 }
181 _ => false,
182 }
183 }
184
185 pub fn is_primitive(&self) -> bool {
187 matches!(*self, Type::PrimitiveType { .. })
188 }
189
190 pub fn is_group(&self) -> bool {
192 matches!(*self, Type::GroupType { .. })
193 }
194
195 pub fn is_schema(&self) -> bool {
197 match *self {
198 Type::GroupType { ref basic_info, .. } => !basic_info.has_repetition(),
199 _ => false,
200 }
201 }
202
203 pub fn is_optional(&self) -> bool {
206 self.get_basic_info().has_repetition()
207 && self.get_basic_info().repetition() != Repetition::REQUIRED
208 }
209
210 pub(crate) fn is_list(&self) -> bool {
212 if self.is_group() {
213 let basic_info = self.get_basic_info();
214 if let Some(logical_type) = basic_info.logical_type() {
215 return logical_type == LogicalType::List;
216 }
217 return basic_info.converted_type() == ConvertedType::LIST;
218 }
219 false
220 }
221
222 pub(crate) fn has_single_repeated_child(&self) -> bool {
224 if self.is_group() {
225 let children = self.get_fields();
226 return children.len() == 1
227 && children[0].get_basic_info().has_repetition()
228 && children[0].get_basic_info().repetition() == Repetition::REPEATED;
229 }
230 false
231 }
232}
233
234pub struct PrimitiveTypeBuilder<'a> {
238 name: &'a str,
239 repetition: Repetition,
240 physical_type: PhysicalType,
241 converted_type: ConvertedType,
242 logical_type: Option<LogicalType>,
243 length: i32,
244 precision: i32,
245 scale: i32,
246 id: Option<i32>,
247}
248
249impl<'a> PrimitiveTypeBuilder<'a> {
250 pub fn new(name: &'a str, physical_type: PhysicalType) -> Self {
252 Self {
253 name,
254 repetition: Repetition::OPTIONAL,
255 physical_type,
256 converted_type: ConvertedType::NONE,
257 logical_type: None,
258 length: -1,
259 precision: -1,
260 scale: -1,
261 id: None,
262 }
263 }
264
265 pub fn with_repetition(self, repetition: Repetition) -> Self {
267 Self { repetition, ..self }
268 }
269
270 pub fn with_converted_type(self, converted_type: ConvertedType) -> Self {
272 Self {
273 converted_type,
274 ..self
275 }
276 }
277
278 pub fn with_logical_type(self, logical_type: Option<LogicalType>) -> Self {
282 Self {
283 logical_type,
284 ..self
285 }
286 }
287
288 pub fn with_length(self, length: i32) -> Self {
293 Self { length, ..self }
294 }
295
296 pub fn with_precision(self, precision: i32) -> Self {
299 Self { precision, ..self }
300 }
301
302 pub fn with_scale(self, scale: i32) -> Self {
305 Self { scale, ..self }
306 }
307
308 pub fn with_id(self, id: Option<i32>) -> Self {
310 Self { id, ..self }
311 }
312
313 pub fn build(self) -> Result<Type> {
316 let mut basic_info = BasicTypeInfo {
317 name: String::from(self.name),
318 repetition: Some(self.repetition),
319 converted_type: self.converted_type,
320 logical_type: self.logical_type.clone(),
321 id: self.id,
322 };
323
324 if self.physical_type == PhysicalType::FIXED_LEN_BYTE_ARRAY && self.length < 0 {
326 return Err(general_err!(
327 "Invalid FIXED_LEN_BYTE_ARRAY length: {} for field '{}'",
328 self.length,
329 self.name
330 ));
331 }
332
333 if let Some(logical_type) = &self.logical_type {
334 if self.converted_type != ConvertedType::NONE {
337 if ConvertedType::from(self.logical_type.clone()) != self.converted_type {
338 return Err(general_err!(
339 "Logical type {:?} is incompatible with converted type {} for field '{}'",
340 logical_type,
341 self.converted_type,
342 self.name
343 ));
344 }
345 } else {
346 basic_info.converted_type = self.logical_type.clone().into();
348 }
349 match (logical_type, self.physical_type) {
351 (LogicalType::Map, _) | (LogicalType::List, _) => {
352 return Err(general_err!(
353 "{:?} cannot be applied to a primitive type for field '{}'",
354 logical_type,
355 self.name
356 ));
357 }
358 (LogicalType::Enum, PhysicalType::BYTE_ARRAY) => {}
359 (LogicalType::Decimal { scale, precision }, _) => {
360 if *scale != self.scale {
362 return Err(general_err!(
363 "DECIMAL logical type scale {} must match self.scale {} for field '{}'",
364 scale,
365 self.scale,
366 self.name
367 ));
368 }
369 if *precision != self.precision {
370 return Err(general_err!(
371 "DECIMAL logical type precision {} must match self.precision {} for field '{}'",
372 precision,
373 self.precision,
374 self.name
375 ));
376 }
377 self.check_decimal_precision_scale()?;
378 }
379 (LogicalType::Date, PhysicalType::INT32) => {}
380 (
381 LogicalType::Time {
382 unit: TimeUnit::MILLIS,
383 ..
384 },
385 PhysicalType::INT32,
386 ) => {}
387 (LogicalType::Time { unit, .. }, PhysicalType::INT64) => {
388 if *unit == TimeUnit::MILLIS {
389 return Err(general_err!(
390 "Cannot use millisecond unit on INT64 type for field '{}'",
391 self.name
392 ));
393 }
394 }
395 (LogicalType::Timestamp { .. }, PhysicalType::INT64) => {}
396 (LogicalType::Integer { bit_width, .. }, PhysicalType::INT32)
397 if *bit_width <= 32 => {}
398 (LogicalType::Integer { bit_width, .. }, PhysicalType::INT64)
399 if *bit_width == 64 => {}
400 (LogicalType::Unknown, PhysicalType::INT32) => {}
402 (LogicalType::String, PhysicalType::BYTE_ARRAY) => {}
403 (LogicalType::Json, PhysicalType::BYTE_ARRAY) => {}
404 (LogicalType::Bson, PhysicalType::BYTE_ARRAY) => {}
405 (LogicalType::Geometry { .. }, PhysicalType::BYTE_ARRAY) => {}
406 (LogicalType::Geography { .. }, PhysicalType::BYTE_ARRAY) => {}
407 (LogicalType::Uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY) if self.length == 16 => {}
408 (LogicalType::Uuid, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {
409 return Err(general_err!(
410 "UUID cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(16) field",
411 self.name
412 ))
413 }
414 (LogicalType::Float16, PhysicalType::FIXED_LEN_BYTE_ARRAY)
415 if self.length == 2 => {}
416 (LogicalType::Float16, PhysicalType::FIXED_LEN_BYTE_ARRAY) => {
417 return Err(general_err!(
418 "FLOAT16 cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(2) field",
419 self.name
420 ))
421 }
422 (a, b) => {
423 return Err(general_err!(
424 "Cannot annotate {:?} from {} for field '{}'",
425 a,
426 b,
427 self.name
428 ))
429 }
430 }
431 }
432
433 match self.converted_type {
434 ConvertedType::NONE => {}
435 ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => {
436 if self.physical_type != PhysicalType::BYTE_ARRAY {
437 return Err(general_err!(
438 "{} cannot annotate field '{}' because it is not a BYTE_ARRAY field",
439 self.converted_type,
440 self.name
441 ));
442 }
443 }
444 ConvertedType::DECIMAL => {
445 self.check_decimal_precision_scale()?;
446 }
447 ConvertedType::DATE
448 | ConvertedType::TIME_MILLIS
449 | ConvertedType::UINT_8
450 | ConvertedType::UINT_16
451 | ConvertedType::UINT_32
452 | ConvertedType::INT_8
453 | ConvertedType::INT_16
454 | ConvertedType::INT_32 => {
455 if self.physical_type != PhysicalType::INT32 {
456 return Err(general_err!(
457 "{} cannot annotate field '{}' because it is not a INT32 field",
458 self.converted_type,
459 self.name
460 ));
461 }
462 }
463 ConvertedType::TIME_MICROS
464 | ConvertedType::TIMESTAMP_MILLIS
465 | ConvertedType::TIMESTAMP_MICROS
466 | ConvertedType::UINT_64
467 | ConvertedType::INT_64 => {
468 if self.physical_type != PhysicalType::INT64 {
469 return Err(general_err!(
470 "{} cannot annotate field '{}' because it is not a INT64 field",
471 self.converted_type,
472 self.name
473 ));
474 }
475 }
476 ConvertedType::INTERVAL => {
477 if self.physical_type != PhysicalType::FIXED_LEN_BYTE_ARRAY || self.length != 12 {
478 return Err(general_err!(
479 "INTERVAL cannot annotate field '{}' because it is not a FIXED_LEN_BYTE_ARRAY(12) field",
480 self.name
481 ));
482 }
483 }
484 ConvertedType::ENUM => {
485 if self.physical_type != PhysicalType::BYTE_ARRAY {
486 return Err(general_err!(
487 "ENUM cannot annotate field '{}' because it is not a BYTE_ARRAY field",
488 self.name
489 ));
490 }
491 }
492 _ => {
493 return Err(general_err!(
494 "{} cannot be applied to primitive field '{}'",
495 self.converted_type,
496 self.name
497 ));
498 }
499 }
500
501 Ok(Type::PrimitiveType {
502 basic_info,
503 physical_type: self.physical_type,
504 type_length: self.length,
505 scale: self.scale,
506 precision: self.precision,
507 })
508 }
509
510 #[inline]
511 fn check_decimal_precision_scale(&self) -> Result<()> {
512 match self.physical_type {
513 PhysicalType::INT32
514 | PhysicalType::INT64
515 | PhysicalType::BYTE_ARRAY
516 | PhysicalType::FIXED_LEN_BYTE_ARRAY => (),
517 _ => {
518 return Err(general_err!(
519 "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
520 ));
521 }
522 }
523
524 if self.precision < 1 {
526 return Err(general_err!(
527 "Invalid DECIMAL precision: {}",
528 self.precision
529 ));
530 }
531
532 if self.scale < 0 {
534 return Err(general_err!("Invalid DECIMAL scale: {}", self.scale));
535 }
536
537 if self.scale > self.precision {
538 return Err(general_err!(
539 "Invalid DECIMAL: scale ({}) cannot be greater than precision \
540 ({})",
541 self.scale,
542 self.precision
543 ));
544 }
545
546 match self.physical_type {
548 PhysicalType::INT32 => {
549 if self.precision > 9 {
550 return Err(general_err!(
551 "Cannot represent INT32 as DECIMAL with precision {}",
552 self.precision
553 ));
554 }
555 }
556 PhysicalType::INT64 => {
557 if self.precision > 18 {
558 return Err(general_err!(
559 "Cannot represent INT64 as DECIMAL with precision {}",
560 self.precision
561 ));
562 }
563 }
564 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
565 let length = self
566 .length
567 .checked_mul(8)
568 .ok_or(general_err!("Invalid length {} for Decimal", self.length))?;
569 let max_precision = (2f64.powi(length - 1) - 1f64).log10().floor() as i32;
570
571 if self.precision > max_precision {
572 return Err(general_err!(
573 "Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \
574 precision {}. The max precision can only be {}",
575 self.length,
576 self.precision,
577 max_precision
578 ));
579 }
580 }
581 _ => (), }
583
584 Ok(())
585 }
586}
587
588pub struct GroupTypeBuilder<'a> {
592 name: &'a str,
593 repetition: Option<Repetition>,
594 converted_type: ConvertedType,
595 logical_type: Option<LogicalType>,
596 fields: Vec<TypePtr>,
597 id: Option<i32>,
598}
599
600impl<'a> GroupTypeBuilder<'a> {
601 pub fn new(name: &'a str) -> Self {
603 Self {
604 name,
605 repetition: None,
606 converted_type: ConvertedType::NONE,
607 logical_type: None,
608 fields: Vec::new(),
609 id: None,
610 }
611 }
612
613 pub fn with_repetition(mut self, repetition: Repetition) -> Self {
615 self.repetition = Some(repetition);
616 self
617 }
618
619 pub fn with_converted_type(self, converted_type: ConvertedType) -> Self {
621 Self {
622 converted_type,
623 ..self
624 }
625 }
626
627 pub fn with_logical_type(self, logical_type: Option<LogicalType>) -> Self {
629 Self {
630 logical_type,
631 ..self
632 }
633 }
634
635 pub fn with_fields(self, fields: Vec<TypePtr>) -> Self {
638 Self { fields, ..self }
639 }
640
641 pub fn with_id(self, id: Option<i32>) -> Self {
643 Self { id, ..self }
644 }
645
646 pub fn build(self) -> Result<Type> {
648 let mut basic_info = BasicTypeInfo {
649 name: String::from(self.name),
650 repetition: self.repetition,
651 converted_type: self.converted_type,
652 logical_type: self.logical_type.clone(),
653 id: self.id,
654 };
655 if self.logical_type.is_some() && self.converted_type == ConvertedType::NONE {
657 basic_info.converted_type = self.logical_type.into();
658 }
659 Ok(Type::GroupType {
660 basic_info,
661 fields: self.fields,
662 })
663 }
664}
665
666#[derive(Clone, Debug, PartialEq, Eq)]
669pub struct BasicTypeInfo {
670 name: String,
671 repetition: Option<Repetition>,
672 converted_type: ConvertedType,
673 logical_type: Option<LogicalType>,
674 id: Option<i32>,
675}
676
677impl HeapSize for BasicTypeInfo {
678 fn heap_size(&self) -> usize {
679 self.name.heap_size()
681 }
682}
683
684impl BasicTypeInfo {
685 pub fn name(&self) -> &str {
687 &self.name
688 }
689
690 pub fn has_repetition(&self) -> bool {
694 self.repetition.is_some()
695 }
696
697 pub fn repetition(&self) -> Repetition {
699 assert!(self.repetition.is_some());
700 self.repetition.unwrap()
701 }
702
703 pub fn converted_type(&self) -> ConvertedType {
705 self.converted_type
706 }
707
708 pub fn logical_type(&self) -> Option<LogicalType> {
710 self.logical_type.clone()
712 }
713
714 pub fn has_id(&self) -> bool {
716 self.id.is_some()
717 }
718
719 pub fn id(&self) -> i32 {
721 assert!(self.id.is_some());
722 self.id.unwrap()
723 }
724}
725
726#[derive(Clone, PartialEq, Debug, Eq, Hash)]
748pub struct ColumnPath {
749 parts: Vec<String>,
750}
751
752impl HeapSize for ColumnPath {
753 fn heap_size(&self) -> usize {
754 self.parts.heap_size()
755 }
756}
757
758impl ColumnPath {
759 pub fn new(parts: Vec<String>) -> Self {
761 ColumnPath { parts }
762 }
763
764 pub fn string(&self) -> String {
772 self.parts.join(".")
773 }
774
775 pub fn append(&mut self, mut tail: Vec<String>) {
787 self.parts.append(&mut tail);
788 }
789
790 pub fn parts(&self) -> &[String] {
792 &self.parts
793 }
794}
795
796impl fmt::Display for ColumnPath {
797 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
798 write!(f, "{:?}", self.string())
799 }
800}
801
802impl From<Vec<String>> for ColumnPath {
803 fn from(parts: Vec<String>) -> Self {
804 ColumnPath { parts }
805 }
806}
807
808impl From<&str> for ColumnPath {
809 fn from(single_path: &str) -> Self {
810 let s = String::from(single_path);
811 ColumnPath::from(s)
812 }
813}
814
815impl From<String> for ColumnPath {
816 fn from(single_path: String) -> Self {
817 let v = vec![single_path];
818 ColumnPath { parts: v }
819 }
820}
821
822impl AsRef<[String]> for ColumnPath {
823 fn as_ref(&self) -> &[String] {
824 &self.parts
825 }
826}
827
828#[derive(Debug, PartialEq)]
833pub struct ColumnDescriptor {
834 primitive_type: TypePtr,
836
837 max_def_level: i16,
839
840 max_rep_level: i16,
842
843 path: ColumnPath,
845}
846
847impl HeapSize for ColumnDescriptor {
848 fn heap_size(&self) -> usize {
849 self.primitive_type.heap_size() + self.path.heap_size()
850 }
851}
852
853impl ColumnDescriptor {
854 pub fn new(
856 primitive_type: TypePtr,
857 max_def_level: i16,
858 max_rep_level: i16,
859 path: ColumnPath,
860 ) -> Self {
861 Self {
862 primitive_type,
863 max_def_level,
864 max_rep_level,
865 path,
866 }
867 }
868
869 #[inline]
871 pub fn max_def_level(&self) -> i16 {
872 self.max_def_level
873 }
874
875 #[inline]
877 pub fn max_rep_level(&self) -> i16 {
878 self.max_rep_level
879 }
880
881 pub fn path(&self) -> &ColumnPath {
883 &self.path
884 }
885
886 pub fn self_type(&self) -> &Type {
888 self.primitive_type.as_ref()
889 }
890
891 pub fn self_type_ptr(&self) -> TypePtr {
894 self.primitive_type.clone()
895 }
896
897 pub fn name(&self) -> &str {
899 self.primitive_type.name()
900 }
901
902 pub fn converted_type(&self) -> ConvertedType {
904 self.primitive_type.get_basic_info().converted_type()
905 }
906
907 pub fn logical_type(&self) -> Option<LogicalType> {
909 self.primitive_type.get_basic_info().logical_type()
910 }
911
912 pub fn physical_type(&self) -> PhysicalType {
915 match self.primitive_type.as_ref() {
916 Type::PrimitiveType { physical_type, .. } => *physical_type,
917 _ => panic!("Expected primitive type!"),
918 }
919 }
920
921 pub fn type_length(&self) -> i32 {
924 match self.primitive_type.as_ref() {
925 Type::PrimitiveType { type_length, .. } => *type_length,
926 _ => panic!("Expected primitive type!"),
927 }
928 }
929
930 pub fn type_precision(&self) -> i32 {
933 match self.primitive_type.as_ref() {
934 Type::PrimitiveType { precision, .. } => *precision,
935 _ => panic!("Expected primitive type!"),
936 }
937 }
938
939 pub fn type_scale(&self) -> i32 {
942 match self.primitive_type.as_ref() {
943 Type::PrimitiveType { scale, .. } => *scale,
944 _ => panic!("Expected primitive type!"),
945 }
946 }
947
948 pub fn sort_order(&self) -> SortOrder {
950 ColumnOrder::get_sort_order(
951 self.logical_type(),
952 self.converted_type(),
953 self.physical_type(),
954 )
955 }
956}
957
958#[derive(PartialEq)]
989pub struct SchemaDescriptor {
990 schema: TypePtr,
995
996 leaves: Vec<ColumnDescPtr>,
1000
1001 leaf_to_base: Vec<usize>,
1012}
1013
1014impl fmt::Debug for SchemaDescriptor {
1015 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1016 f.debug_struct("SchemaDescriptor")
1018 .field("schema", &self.schema)
1019 .finish()
1020 }
1021}
1022
1023impl HeapSize for SchemaDescriptor {
1025 fn heap_size(&self) -> usize {
1026 self.schema.heap_size() + self.leaves.heap_size() + self.leaf_to_base.heap_size()
1027 }
1028}
1029
1030impl SchemaDescriptor {
1031 pub fn new(tp: TypePtr) -> Self {
1033 const INIT_SCHEMA_DEPTH: usize = 16;
1034 assert!(tp.is_group(), "SchemaDescriptor should take a GroupType");
1035 let n_leaves = num_leaves(&tp).unwrap();
1037 let mut leaves = Vec::with_capacity(n_leaves);
1038 let mut leaf_to_base = Vec::with_capacity(n_leaves);
1039 let mut path = Vec::with_capacity(INIT_SCHEMA_DEPTH);
1040 for (root_idx, f) in tp.get_fields().iter().enumerate() {
1041 path.clear();
1042 build_tree(f, root_idx, 0, 0, &mut leaves, &mut leaf_to_base, &mut path);
1043 }
1044
1045 Self {
1046 schema: tp,
1047 leaves,
1048 leaf_to_base,
1049 }
1050 }
1051
1052 pub fn column(&self, i: usize) -> ColumnDescPtr {
1054 assert!(
1055 i < self.leaves.len(),
1056 "Index out of bound: {} not in [0, {})",
1057 i,
1058 self.leaves.len()
1059 );
1060 self.leaves[i].clone()
1061 }
1062
1063 pub fn columns(&self) -> &[ColumnDescPtr] {
1065 &self.leaves
1066 }
1067
1068 pub fn num_columns(&self) -> usize {
1070 self.leaves.len()
1071 }
1072
1073 pub fn get_column_root(&self, i: usize) -> &Type {
1075 let result = self.column_root_of(i);
1076 result.as_ref()
1077 }
1078
1079 pub fn get_column_root_ptr(&self, i: usize) -> TypePtr {
1081 let result = self.column_root_of(i);
1082 result.clone()
1083 }
1084
1085 pub fn get_column_root_idx(&self, leaf: usize) -> usize {
1087 assert!(
1088 leaf < self.leaves.len(),
1089 "Index out of bound: {} not in [0, {})",
1090 leaf,
1091 self.leaves.len()
1092 );
1093
1094 *self
1095 .leaf_to_base
1096 .get(leaf)
1097 .unwrap_or_else(|| panic!("Expected a value for index {leaf} but found None"))
1098 }
1099
1100 fn column_root_of(&self, i: usize) -> &TypePtr {
1101 &self.schema.get_fields()[self.get_column_root_idx(i)]
1102 }
1103
1104 pub fn root_schema(&self) -> &Type {
1106 self.schema.as_ref()
1107 }
1108
1109 pub fn root_schema_ptr(&self) -> TypePtr {
1111 self.schema.clone()
1112 }
1113
1114 pub fn name(&self) -> &str {
1116 self.schema.name()
1117 }
1118}
1119
1120pub(crate) fn num_nodes(tp: &TypePtr) -> Result<usize> {
1122 if !tp.is_group() {
1123 return Err(general_err!("Root schema must be Group type"));
1124 }
1125 let mut n_nodes = 1usize; for f in tp.get_fields().iter() {
1127 count_nodes(f, &mut n_nodes);
1128 }
1129 Ok(n_nodes)
1130}
1131
1132pub(crate) fn count_nodes(tp: &TypePtr, n_nodes: &mut usize) {
1133 *n_nodes += 1;
1134 if let Type::GroupType { ref fields, .. } = tp.as_ref() {
1135 for f in fields {
1136 count_nodes(f, n_nodes);
1137 }
1138 }
1139}
1140
1141fn num_leaves(tp: &TypePtr) -> Result<usize> {
1143 if !tp.is_group() {
1144 return Err(general_err!("Root schema must be Group type"));
1145 }
1146 let mut n_leaves = 0usize;
1147 for f in tp.get_fields().iter() {
1148 count_leaves(f, &mut n_leaves);
1149 }
1150 Ok(n_leaves)
1151}
1152
1153fn count_leaves(tp: &TypePtr, n_leaves: &mut usize) {
1154 match tp.as_ref() {
1155 Type::PrimitiveType { .. } => *n_leaves += 1,
1156 Type::GroupType { ref fields, .. } => {
1157 for f in fields {
1158 count_leaves(f, n_leaves);
1159 }
1160 }
1161 }
1162}
1163
1164fn build_tree<'a>(
1165 tp: &'a TypePtr,
1166 root_idx: usize,
1167 mut max_rep_level: i16,
1168 mut max_def_level: i16,
1169 leaves: &mut Vec<ColumnDescPtr>,
1170 leaf_to_base: &mut Vec<usize>,
1171 path_so_far: &mut Vec<&'a str>,
1172) {
1173 assert!(tp.get_basic_info().has_repetition());
1174
1175 path_so_far.push(tp.name());
1176 match tp.get_basic_info().repetition() {
1177 Repetition::OPTIONAL => {
1178 max_def_level += 1;
1179 }
1180 Repetition::REPEATED => {
1181 max_def_level += 1;
1182 max_rep_level += 1;
1183 }
1184 _ => {}
1185 }
1186
1187 match tp.as_ref() {
1188 Type::PrimitiveType { .. } => {
1189 let mut path: Vec<String> = vec![];
1190 path.extend(path_so_far.iter().copied().map(String::from));
1191 leaves.push(Arc::new(ColumnDescriptor::new(
1192 tp.clone(),
1193 max_def_level,
1194 max_rep_level,
1195 ColumnPath::new(path),
1196 )));
1197 leaf_to_base.push(root_idx);
1198 }
1199 Type::GroupType { ref fields, .. } => {
1200 for f in fields {
1201 build_tree(
1202 f,
1203 root_idx,
1204 max_rep_level,
1205 max_def_level,
1206 leaves,
1207 leaf_to_base,
1208 path_so_far,
1209 );
1210 path_so_far.pop();
1211 }
1212 }
1213 }
1214}
1215
1216fn check_logical_type(logical_type: &Option<LogicalType>) -> Result<()> {
1218 if let Some(LogicalType::Integer { bit_width, .. }) = *logical_type {
1219 if bit_width != 8 && bit_width != 16 && bit_width != 32 && bit_width != 64 {
1220 return Err(general_err!(
1221 "Bit width must be 8, 16, 32, or 64 for Integer logical type"
1222 ));
1223 }
1224 }
1225 Ok(())
1226}
1227
1228pub(crate) fn parquet_schema_from_array<'a>(elements: Vec<SchemaElement<'a>>) -> Result<TypePtr> {
1231 let mut index = 0;
1232 let num_elements = elements.len();
1233 let mut schema_nodes = Vec::with_capacity(1); let mut elements = elements.into_iter();
1237
1238 while index < num_elements {
1239 let t = schema_from_array_helper(&mut elements, num_elements, index)?;
1240 index = t.0;
1241 schema_nodes.push(t.1);
1242 }
1243 if schema_nodes.len() != 1 {
1244 return Err(general_err!(
1245 "Expected exactly one root node, but found {}",
1246 schema_nodes.len()
1247 ));
1248 }
1249
1250 if !schema_nodes[0].is_group() {
1251 return Err(general_err!("Expected root node to be a group type"));
1252 }
1253
1254 Ok(schema_nodes.remove(0))
1255}
1256
1257fn schema_from_array_helper<'a>(
1259 elements: &mut IntoIter<SchemaElement<'a>>,
1260 num_elements: usize,
1261 index: usize,
1262) -> Result<(usize, TypePtr)> {
1263 let is_root_node = index == 0;
1266
1267 if index >= num_elements {
1268 return Err(general_err!(
1269 "Index out of bound, index = {}, len = {}",
1270 index,
1271 num_elements
1272 ));
1273 }
1274 let element = elements.next().expect("schema vector should not be empty");
1275
1276 if let (true, None | Some(0)) = (is_root_node, element.num_children) {
1278 let builder = Type::group_type_builder(element.name);
1279 return Ok((index + 1, Arc::new(builder.build().unwrap())));
1280 }
1281
1282 let converted_type = element.converted_type.unwrap_or(ConvertedType::NONE);
1283
1284 let logical_type = element.logical_type;
1286
1287 check_logical_type(&logical_type)?;
1288
1289 let field_id = element.field_id;
1290 match element.num_children {
1291 None | Some(0) => {
1297 if element.repetition_type.is_none() {
1299 return Err(general_err!(
1300 "Repetition level must be defined for a primitive type"
1301 ));
1302 }
1303 let repetition = element.repetition_type.unwrap();
1304 if let Some(physical_type) = element.r#type {
1305 let length = element.type_length.unwrap_or(-1);
1306 let scale = element.scale.unwrap_or(-1);
1307 let precision = element.precision.unwrap_or(-1);
1308 let name = element.name;
1309 let builder = Type::primitive_type_builder(name, physical_type)
1310 .with_repetition(repetition)
1311 .with_converted_type(converted_type)
1312 .with_logical_type(logical_type)
1313 .with_length(length)
1314 .with_precision(precision)
1315 .with_scale(scale)
1316 .with_id(field_id);
1317 Ok((index + 1, Arc::new(builder.build()?)))
1318 } else {
1319 let mut builder = Type::group_type_builder(element.name)
1320 .with_converted_type(converted_type)
1321 .with_logical_type(logical_type)
1322 .with_id(field_id);
1323 if !is_root_node {
1324 builder = builder.with_repetition(repetition);
1332 }
1333 Ok((index + 1, Arc::new(builder.build().unwrap())))
1334 }
1335 }
1336 Some(n) => {
1337 let repetition = element.repetition_type;
1338
1339 let mut fields = Vec::with_capacity(n as usize);
1340 let mut next_index = index + 1;
1341 for _ in 0..n {
1342 let child_result = schema_from_array_helper(elements, num_elements, next_index)?;
1343 next_index = child_result.0;
1344 fields.push(child_result.1);
1345 }
1346
1347 let mut builder = Type::group_type_builder(element.name)
1348 .with_converted_type(converted_type)
1349 .with_logical_type(logical_type)
1350 .with_fields(fields)
1351 .with_id(field_id);
1352 if let Some(rep) = repetition {
1353 if !is_root_node {
1361 builder = builder.with_repetition(rep);
1362 }
1363 }
1364 Ok((next_index, Arc::new(builder.build().unwrap())))
1365 }
1366 }
1367}
1368
1369#[cfg(test)]
1370mod tests {
1371 use super::*;
1372
1373 use crate::{
1374 file::metadata::thrift_gen::tests::{buf_to_schema_list, roundtrip_schema, schema_to_buf},
1375 schema::parser::parse_message_type,
1376 };
1377
1378 #[test]
1381 fn test_primitive_type() {
1382 let mut result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1383 .with_logical_type(Some(LogicalType::Integer {
1384 bit_width: 32,
1385 is_signed: true,
1386 }))
1387 .with_id(Some(0))
1388 .build();
1389 assert!(result.is_ok());
1390
1391 if let Ok(tp) = result {
1392 assert!(tp.is_primitive());
1393 assert!(!tp.is_group());
1394 let basic_info = tp.get_basic_info();
1395 assert_eq!(basic_info.repetition(), Repetition::OPTIONAL);
1396 assert_eq!(
1397 basic_info.logical_type(),
1398 Some(LogicalType::Integer {
1399 bit_width: 32,
1400 is_signed: true
1401 })
1402 );
1403 assert_eq!(basic_info.converted_type(), ConvertedType::INT_32);
1404 assert_eq!(basic_info.id(), 0);
1405 match tp {
1406 Type::PrimitiveType { physical_type, .. } => {
1407 assert_eq!(physical_type, PhysicalType::INT32);
1408 }
1409 _ => panic!(),
1410 }
1411 }
1412
1413 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1415 .with_repetition(Repetition::REPEATED)
1416 .with_logical_type(Some(LogicalType::Integer {
1417 is_signed: true,
1418 bit_width: 8,
1419 }))
1420 .build();
1421 assert!(result.is_err());
1422 if let Err(e) = result {
1423 assert_eq!(
1424 format!("{e}"),
1425 "Parquet error: Cannot annotate Integer { bit_width: 8, is_signed: true } from INT64 for field 'foo'"
1426 );
1427 }
1428
1429 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1431 .with_repetition(Repetition::REPEATED)
1432 .with_converted_type(ConvertedType::BSON)
1433 .build();
1434 assert!(result.is_err());
1435 if let Err(e) = result {
1436 assert_eq!(
1437 format!("{e}"),
1438 "Parquet error: BSON cannot annotate field 'foo' because it is not a BYTE_ARRAY field"
1439 );
1440 }
1441
1442 result = Type::primitive_type_builder("foo", PhysicalType::INT96)
1443 .with_repetition(Repetition::REQUIRED)
1444 .with_converted_type(ConvertedType::DECIMAL)
1445 .with_precision(-1)
1446 .with_scale(-1)
1447 .build();
1448 assert!(result.is_err());
1449 if let Err(e) = result {
1450 assert_eq!(
1451 format!("{e}"),
1452 "Parquet error: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
1453 );
1454 }
1455
1456 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1457 .with_repetition(Repetition::REQUIRED)
1458 .with_logical_type(Some(LogicalType::Decimal {
1459 scale: 32,
1460 precision: 12,
1461 }))
1462 .with_precision(-1)
1463 .with_scale(-1)
1464 .build();
1465 assert!(result.is_err());
1466 if let Err(e) = result {
1467 assert_eq!(
1468 format!("{e}"),
1469 "Parquet error: DECIMAL logical type scale 32 must match self.scale -1 for field 'foo'"
1470 );
1471 }
1472
1473 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1474 .with_repetition(Repetition::REQUIRED)
1475 .with_converted_type(ConvertedType::DECIMAL)
1476 .with_precision(-1)
1477 .with_scale(-1)
1478 .build();
1479 assert!(result.is_err());
1480 if let Err(e) = result {
1481 assert_eq!(
1482 format!("{e}"),
1483 "Parquet error: Invalid DECIMAL precision: -1"
1484 );
1485 }
1486
1487 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1488 .with_repetition(Repetition::REQUIRED)
1489 .with_converted_type(ConvertedType::DECIMAL)
1490 .with_precision(0)
1491 .with_scale(-1)
1492 .build();
1493 assert!(result.is_err());
1494 if let Err(e) = result {
1495 assert_eq!(
1496 format!("{e}"),
1497 "Parquet error: Invalid DECIMAL precision: 0"
1498 );
1499 }
1500
1501 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1502 .with_repetition(Repetition::REQUIRED)
1503 .with_converted_type(ConvertedType::DECIMAL)
1504 .with_precision(1)
1505 .with_scale(-1)
1506 .build();
1507 assert!(result.is_err());
1508 if let Err(e) = result {
1509 assert_eq!(format!("{e}"), "Parquet error: Invalid DECIMAL scale: -1");
1510 }
1511
1512 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1513 .with_repetition(Repetition::REQUIRED)
1514 .with_converted_type(ConvertedType::DECIMAL)
1515 .with_precision(1)
1516 .with_scale(2)
1517 .build();
1518 assert!(result.is_err());
1519 if let Err(e) = result {
1520 assert_eq!(
1521 format!("{e}"),
1522 "Parquet error: Invalid DECIMAL: scale (2) cannot be greater than precision (1)"
1523 );
1524 }
1525
1526 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1528 .with_repetition(Repetition::REQUIRED)
1529 .with_converted_type(ConvertedType::DECIMAL)
1530 .with_precision(1)
1531 .with_scale(1)
1532 .build();
1533 assert!(result.is_ok());
1534
1535 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1536 .with_repetition(Repetition::REQUIRED)
1537 .with_converted_type(ConvertedType::DECIMAL)
1538 .with_precision(18)
1539 .with_scale(2)
1540 .build();
1541 assert!(result.is_err());
1542 if let Err(e) = result {
1543 assert_eq!(
1544 format!("{e}"),
1545 "Parquet error: Cannot represent INT32 as DECIMAL with precision 18"
1546 );
1547 }
1548
1549 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1550 .with_repetition(Repetition::REQUIRED)
1551 .with_converted_type(ConvertedType::DECIMAL)
1552 .with_precision(32)
1553 .with_scale(2)
1554 .build();
1555 assert!(result.is_err());
1556 if let Err(e) = result {
1557 assert_eq!(
1558 format!("{e}"),
1559 "Parquet error: Cannot represent INT64 as DECIMAL with precision 32"
1560 );
1561 }
1562
1563 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1564 .with_repetition(Repetition::REQUIRED)
1565 .with_converted_type(ConvertedType::DECIMAL)
1566 .with_length(5)
1567 .with_precision(12)
1568 .with_scale(2)
1569 .build();
1570 assert!(result.is_err());
1571 if let Err(e) = result {
1572 assert_eq!(
1573 format!("{e}"),
1574 "Parquet error: Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length 5 and precision 12. The max precision can only be 11"
1575 );
1576 }
1577
1578 result = Type::primitive_type_builder("foo", PhysicalType::INT64)
1579 .with_repetition(Repetition::REQUIRED)
1580 .with_converted_type(ConvertedType::UINT_8)
1581 .build();
1582 assert!(result.is_err());
1583 if let Err(e) = result {
1584 assert_eq!(
1585 format!("{e}"),
1586 "Parquet error: UINT_8 cannot annotate field 'foo' because it is not a INT32 field"
1587 );
1588 }
1589
1590 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1591 .with_repetition(Repetition::REQUIRED)
1592 .with_converted_type(ConvertedType::TIME_MICROS)
1593 .build();
1594 assert!(result.is_err());
1595 if let Err(e) = result {
1596 assert_eq!(
1597 format!("{e}"),
1598 "Parquet error: TIME_MICROS cannot annotate field 'foo' because it is not a INT64 field"
1599 );
1600 }
1601
1602 result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
1603 .with_repetition(Repetition::REQUIRED)
1604 .with_converted_type(ConvertedType::INTERVAL)
1605 .build();
1606 assert!(result.is_err());
1607 if let Err(e) = result {
1608 assert_eq!(
1609 format!("{e}"),
1610 "Parquet error: INTERVAL cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(12) field"
1611 );
1612 }
1613
1614 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1615 .with_repetition(Repetition::REQUIRED)
1616 .with_converted_type(ConvertedType::INTERVAL)
1617 .with_length(1)
1618 .build();
1619 assert!(result.is_err());
1620 if let Err(e) = result {
1621 assert_eq!(
1622 format!("{e}"),
1623 "Parquet error: INTERVAL cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(12) field"
1624 );
1625 }
1626
1627 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1628 .with_repetition(Repetition::REQUIRED)
1629 .with_converted_type(ConvertedType::ENUM)
1630 .build();
1631 assert!(result.is_err());
1632 if let Err(e) = result {
1633 assert_eq!(
1634 format!("{e}"),
1635 "Parquet error: ENUM cannot annotate field 'foo' because it is not a BYTE_ARRAY field"
1636 );
1637 }
1638
1639 result = Type::primitive_type_builder("foo", PhysicalType::INT32)
1640 .with_repetition(Repetition::REQUIRED)
1641 .with_converted_type(ConvertedType::MAP)
1642 .build();
1643 assert!(result.is_err());
1644 if let Err(e) = result {
1645 assert_eq!(
1646 format!("{e}"),
1647 "Parquet error: MAP cannot be applied to primitive field 'foo'"
1648 );
1649 }
1650
1651 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1652 .with_repetition(Repetition::REQUIRED)
1653 .with_converted_type(ConvertedType::DECIMAL)
1654 .with_length(-1)
1655 .build();
1656 assert!(result.is_err());
1657 if let Err(e) = result {
1658 assert_eq!(
1659 format!("{e}"),
1660 "Parquet error: Invalid FIXED_LEN_BYTE_ARRAY length: -1 for field 'foo'"
1661 );
1662 }
1663
1664 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1665 .with_repetition(Repetition::REQUIRED)
1666 .with_logical_type(Some(LogicalType::Float16))
1667 .with_length(2)
1668 .build();
1669 assert!(result.is_ok());
1670
1671 result = Type::primitive_type_builder("foo", PhysicalType::FLOAT)
1673 .with_repetition(Repetition::REQUIRED)
1674 .with_logical_type(Some(LogicalType::Float16))
1675 .with_length(2)
1676 .build();
1677 assert!(result.is_err());
1678 if let Err(e) = result {
1679 assert_eq!(
1680 format!("{e}"),
1681 "Parquet error: Cannot annotate Float16 from FLOAT for field 'foo'"
1682 );
1683 }
1684
1685 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1687 .with_repetition(Repetition::REQUIRED)
1688 .with_logical_type(Some(LogicalType::Float16))
1689 .with_length(4)
1690 .build();
1691 assert!(result.is_err());
1692 if let Err(e) = result {
1693 assert_eq!(
1694 format!("{e}"),
1695 "Parquet error: FLOAT16 cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(2) field"
1696 );
1697 }
1698
1699 result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1701 .with_repetition(Repetition::REQUIRED)
1702 .with_logical_type(Some(LogicalType::Uuid))
1703 .with_length(15)
1704 .build();
1705 assert!(result.is_err());
1706 if let Err(e) = result {
1707 assert_eq!(
1708 format!("{e}"),
1709 "Parquet error: UUID cannot annotate field 'foo' because it is not a FIXED_LEN_BYTE_ARRAY(16) field"
1710 );
1711 }
1712 }
1713
1714 #[test]
1715 fn test_group_type() {
1716 let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
1717 .with_converted_type(ConvertedType::INT_32)
1718 .with_id(Some(0))
1719 .build();
1720 assert!(f1.is_ok());
1721 let f2 = Type::primitive_type_builder("f2", PhysicalType::BYTE_ARRAY)
1722 .with_converted_type(ConvertedType::UTF8)
1723 .with_id(Some(1))
1724 .build();
1725 assert!(f2.is_ok());
1726
1727 let fields = vec![Arc::new(f1.unwrap()), Arc::new(f2.unwrap())];
1728
1729 let result = Type::group_type_builder("foo")
1730 .with_repetition(Repetition::REPEATED)
1731 .with_logical_type(Some(LogicalType::List))
1732 .with_fields(fields)
1733 .with_id(Some(1))
1734 .build();
1735 assert!(result.is_ok());
1736
1737 let tp = result.unwrap();
1738 let basic_info = tp.get_basic_info();
1739 assert!(tp.is_group());
1740 assert!(!tp.is_primitive());
1741 assert_eq!(basic_info.repetition(), Repetition::REPEATED);
1742 assert_eq!(basic_info.logical_type(), Some(LogicalType::List));
1743 assert_eq!(basic_info.converted_type(), ConvertedType::LIST);
1744 assert_eq!(basic_info.id(), 1);
1745 assert_eq!(tp.get_fields().len(), 2);
1746 assert_eq!(tp.get_fields()[0].name(), "f1");
1747 assert_eq!(tp.get_fields()[1].name(), "f2");
1748 }
1749
1750 #[test]
1751 fn test_column_descriptor() {
1752 let result = test_column_descriptor_helper();
1753 assert!(
1754 result.is_ok(),
1755 "Expected result to be OK but got err:\n {}",
1756 result.unwrap_err()
1757 );
1758 }
1759
1760 fn test_column_descriptor_helper() -> Result<()> {
1761 let tp = Type::primitive_type_builder("name", PhysicalType::BYTE_ARRAY)
1762 .with_converted_type(ConvertedType::UTF8)
1763 .build()?;
1764
1765 let descr = ColumnDescriptor::new(Arc::new(tp), 4, 1, ColumnPath::from("name"));
1766
1767 assert_eq!(descr.path(), &ColumnPath::from("name"));
1768 assert_eq!(descr.converted_type(), ConvertedType::UTF8);
1769 assert_eq!(descr.physical_type(), PhysicalType::BYTE_ARRAY);
1770 assert_eq!(descr.max_def_level(), 4);
1771 assert_eq!(descr.max_rep_level(), 1);
1772 assert_eq!(descr.name(), "name");
1773 assert_eq!(descr.type_length(), -1);
1774 assert_eq!(descr.type_precision(), -1);
1775 assert_eq!(descr.type_scale(), -1);
1776
1777 Ok(())
1778 }
1779
1780 #[test]
1781 fn test_schema_descriptor() {
1782 let result = test_schema_descriptor_helper();
1783 assert!(
1784 result.is_ok(),
1785 "Expected result to be OK but got err:\n {}",
1786 result.unwrap_err()
1787 );
1788 }
1789
1790 fn test_schema_descriptor_helper() -> Result<()> {
1792 let mut fields = vec![];
1793
1794 let inta = Type::primitive_type_builder("a", PhysicalType::INT32)
1795 .with_repetition(Repetition::REQUIRED)
1796 .with_converted_type(ConvertedType::INT_32)
1797 .build()?;
1798 fields.push(Arc::new(inta));
1799 let intb = Type::primitive_type_builder("b", PhysicalType::INT64)
1800 .with_converted_type(ConvertedType::INT_64)
1801 .build()?;
1802 fields.push(Arc::new(intb));
1803 let intc = Type::primitive_type_builder("c", PhysicalType::BYTE_ARRAY)
1804 .with_repetition(Repetition::REPEATED)
1805 .with_converted_type(ConvertedType::UTF8)
1806 .build()?;
1807 fields.push(Arc::new(intc));
1808
1809 let item1 = Type::primitive_type_builder("item1", PhysicalType::INT64)
1811 .with_repetition(Repetition::REQUIRED)
1812 .with_converted_type(ConvertedType::INT_64)
1813 .build()?;
1814 let item2 = Type::primitive_type_builder("item2", PhysicalType::BOOLEAN).build()?;
1815 let item3 = Type::primitive_type_builder("item3", PhysicalType::INT32)
1816 .with_repetition(Repetition::REPEATED)
1817 .with_converted_type(ConvertedType::INT_32)
1818 .build()?;
1819 let list = Type::group_type_builder("records")
1820 .with_repetition(Repetition::REPEATED)
1821 .with_converted_type(ConvertedType::LIST)
1822 .with_fields(vec![Arc::new(item1), Arc::new(item2), Arc::new(item3)])
1823 .build()?;
1824 let bag = Type::group_type_builder("bag")
1825 .with_repetition(Repetition::OPTIONAL)
1826 .with_fields(vec![Arc::new(list)])
1827 .build()?;
1828 fields.push(Arc::new(bag));
1829
1830 let schema = Type::group_type_builder("schema")
1831 .with_repetition(Repetition::REPEATED)
1832 .with_fields(fields)
1833 .build()?;
1834 let descr = SchemaDescriptor::new(Arc::new(schema));
1835
1836 let nleaves = 6;
1837 assert_eq!(descr.num_columns(), nleaves);
1838
1839 let ex_max_def_levels = [0, 1, 1, 2, 3, 3];
1849 let ex_max_rep_levels = [0, 0, 1, 1, 1, 2];
1850
1851 for i in 0..nleaves {
1852 let col = descr.column(i);
1853 assert_eq!(col.max_def_level(), ex_max_def_levels[i], "{i}");
1854 assert_eq!(col.max_rep_level(), ex_max_rep_levels[i], "{i}");
1855 }
1856
1857 assert_eq!(descr.column(0).path().string(), "a");
1858 assert_eq!(descr.column(1).path().string(), "b");
1859 assert_eq!(descr.column(2).path().string(), "c");
1860 assert_eq!(descr.column(3).path().string(), "bag.records.item1");
1861 assert_eq!(descr.column(4).path().string(), "bag.records.item2");
1862 assert_eq!(descr.column(5).path().string(), "bag.records.item3");
1863
1864 assert_eq!(descr.get_column_root(0).name(), "a");
1865 assert_eq!(descr.get_column_root(3).name(), "bag");
1866 assert_eq!(descr.get_column_root(4).name(), "bag");
1867 assert_eq!(descr.get_column_root(5).name(), "bag");
1868
1869 Ok(())
1870 }
1871
1872 #[test]
1873 fn test_schema_build_tree_def_rep_levels() {
1874 let message_type = "
1875 message spark_schema {
1876 REQUIRED INT32 a;
1877 OPTIONAL group b {
1878 OPTIONAL INT32 _1;
1879 OPTIONAL INT32 _2;
1880 }
1881 OPTIONAL group c (LIST) {
1882 REPEATED group list {
1883 OPTIONAL INT32 element;
1884 }
1885 }
1886 }
1887 ";
1888 let schema = parse_message_type(message_type).expect("should parse schema");
1889 let descr = SchemaDescriptor::new(Arc::new(schema));
1890 assert_eq!(descr.column(0).max_def_level(), 0);
1892 assert_eq!(descr.column(0).max_rep_level(), 0);
1893 assert_eq!(descr.column(1).max_def_level(), 2);
1895 assert_eq!(descr.column(1).max_rep_level(), 0);
1896 assert_eq!(descr.column(2).max_def_level(), 2);
1898 assert_eq!(descr.column(2).max_rep_level(), 0);
1899 assert_eq!(descr.column(3).max_def_level(), 3);
1901 assert_eq!(descr.column(3).max_rep_level(), 1);
1902 }
1903
1904 #[test]
1905 #[should_panic(expected = "Cannot call get_physical_type() on a non-primitive type")]
1906 fn test_get_physical_type_panic() {
1907 let list = Type::group_type_builder("records")
1908 .with_repetition(Repetition::REPEATED)
1909 .build()
1910 .unwrap();
1911 list.get_physical_type();
1912 }
1913
1914 #[test]
1915 fn test_get_physical_type_primitive() {
1916 let f = Type::primitive_type_builder("f", PhysicalType::INT64)
1917 .build()
1918 .unwrap();
1919 assert_eq!(f.get_physical_type(), PhysicalType::INT64);
1920
1921 let f = Type::primitive_type_builder("f", PhysicalType::BYTE_ARRAY)
1922 .build()
1923 .unwrap();
1924 assert_eq!(f.get_physical_type(), PhysicalType::BYTE_ARRAY);
1925 }
1926
1927 #[test]
1928 fn test_check_contains_primitive_primitive() {
1929 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1931 .build()
1932 .unwrap();
1933 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
1934 .build()
1935 .unwrap();
1936 assert!(f1.check_contains(&f2));
1937
1938 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1940 .with_converted_type(ConvertedType::UINT_8)
1941 .build()
1942 .unwrap();
1943 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
1944 .with_converted_type(ConvertedType::UINT_16)
1945 .build()
1946 .unwrap();
1947 assert!(f1.check_contains(&f2));
1948
1949 let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
1951 .build()
1952 .unwrap();
1953 let f2 = Type::primitive_type_builder("f2", PhysicalType::INT32)
1954 .build()
1955 .unwrap();
1956 assert!(!f1.check_contains(&f2));
1957
1958 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1960 .build()
1961 .unwrap();
1962 let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
1963 .build()
1964 .unwrap();
1965 assert!(!f1.check_contains(&f2));
1966
1967 let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
1969 .with_repetition(Repetition::REQUIRED)
1970 .build()
1971 .unwrap();
1972 let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
1973 .with_repetition(Repetition::OPTIONAL)
1974 .build()
1975 .unwrap();
1976 assert!(!f1.check_contains(&f2));
1977 }
1978
1979 fn test_new_group_type(name: &str, repetition: Repetition, types: Vec<Type>) -> Type {
1981 Type::group_type_builder(name)
1982 .with_repetition(repetition)
1983 .with_fields(types.into_iter().map(Arc::new).collect())
1984 .build()
1985 .unwrap()
1986 }
1987
1988 #[test]
1989 fn test_check_contains_group_group() {
1990 let f1 = Type::group_type_builder("f").build().unwrap();
1992 let f2 = Type::group_type_builder("f").build().unwrap();
1993 assert!(f1.check_contains(&f2));
1994 assert!(!f1.is_optional());
1995
1996 let f1 = test_new_group_type(
1998 "f",
1999 Repetition::REPEATED,
2000 vec![
2001 Type::primitive_type_builder("f1", PhysicalType::INT32)
2002 .build()
2003 .unwrap(),
2004 Type::primitive_type_builder("f2", PhysicalType::INT64)
2005 .build()
2006 .unwrap(),
2007 ],
2008 );
2009 let f2 = test_new_group_type(
2010 "f",
2011 Repetition::REPEATED,
2012 vec![
2013 Type::primitive_type_builder("f1", PhysicalType::INT32)
2014 .build()
2015 .unwrap(),
2016 Type::primitive_type_builder("f2", PhysicalType::INT64)
2017 .build()
2018 .unwrap(),
2019 ],
2020 );
2021 assert!(f1.check_contains(&f2));
2022
2023 let f1 = test_new_group_type(
2025 "f",
2026 Repetition::REPEATED,
2027 vec![
2028 Type::primitive_type_builder("f1", PhysicalType::INT32)
2029 .build()
2030 .unwrap(),
2031 Type::primitive_type_builder("f2", PhysicalType::INT64)
2032 .build()
2033 .unwrap(),
2034 ],
2035 );
2036 let f2 = test_new_group_type(
2037 "f",
2038 Repetition::REPEATED,
2039 vec![Type::primitive_type_builder("f2", PhysicalType::INT64)
2040 .build()
2041 .unwrap()],
2042 );
2043 assert!(f1.check_contains(&f2));
2044
2045 let f1 = Type::group_type_builder("f1").build().unwrap();
2047 let f2 = Type::group_type_builder("f2").build().unwrap();
2048 assert!(!f1.check_contains(&f2));
2049
2050 let f1 = Type::group_type_builder("f")
2052 .with_repetition(Repetition::OPTIONAL)
2053 .build()
2054 .unwrap();
2055 let f2 = Type::group_type_builder("f")
2056 .with_repetition(Repetition::REPEATED)
2057 .build()
2058 .unwrap();
2059 assert!(!f1.check_contains(&f2));
2060
2061 let f1 = test_new_group_type(
2063 "f",
2064 Repetition::REPEATED,
2065 vec![
2066 Type::primitive_type_builder("f1", PhysicalType::INT32)
2067 .build()
2068 .unwrap(),
2069 Type::primitive_type_builder("f2", PhysicalType::INT64)
2070 .build()
2071 .unwrap(),
2072 ],
2073 );
2074 let f2 = test_new_group_type(
2075 "f",
2076 Repetition::REPEATED,
2077 vec![
2078 Type::primitive_type_builder("f1", PhysicalType::INT32)
2079 .build()
2080 .unwrap(),
2081 Type::primitive_type_builder("f2", PhysicalType::BOOLEAN)
2082 .build()
2083 .unwrap(),
2084 ],
2085 );
2086 assert!(!f1.check_contains(&f2));
2087
2088 let f1 = test_new_group_type(
2090 "f",
2091 Repetition::REPEATED,
2092 vec![
2093 Type::primitive_type_builder("f1", PhysicalType::INT32)
2094 .build()
2095 .unwrap(),
2096 Type::primitive_type_builder("f2", PhysicalType::INT64)
2097 .build()
2098 .unwrap(),
2099 ],
2100 );
2101 let f2 = test_new_group_type(
2102 "f",
2103 Repetition::REPEATED,
2104 vec![Type::primitive_type_builder("f3", PhysicalType::INT32)
2105 .build()
2106 .unwrap()],
2107 );
2108 assert!(!f1.check_contains(&f2));
2109 }
2110
2111 #[test]
2112 fn test_check_contains_group_primitive() {
2113 let f1 = Type::group_type_builder("f").build().unwrap();
2115 let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
2116 .build()
2117 .unwrap();
2118 assert!(!f1.check_contains(&f2));
2119 assert!(!f2.check_contains(&f1));
2120
2121 let f1 = test_new_group_type(
2123 "f",
2124 Repetition::REPEATED,
2125 vec![Type::primitive_type_builder("f1", PhysicalType::INT32)
2126 .build()
2127 .unwrap()],
2128 );
2129 let f2 = Type::primitive_type_builder("f1", PhysicalType::INT32)
2130 .build()
2131 .unwrap();
2132 assert!(!f1.check_contains(&f2));
2133 assert!(!f2.check_contains(&f1));
2134
2135 let f1 = test_new_group_type(
2137 "a",
2138 Repetition::REPEATED,
2139 vec![
2140 test_new_group_type(
2141 "b",
2142 Repetition::REPEATED,
2143 vec![Type::primitive_type_builder("c", PhysicalType::INT32)
2144 .build()
2145 .unwrap()],
2146 ),
2147 Type::primitive_type_builder("d", PhysicalType::INT64)
2148 .build()
2149 .unwrap(),
2150 Type::primitive_type_builder("e", PhysicalType::BOOLEAN)
2151 .build()
2152 .unwrap(),
2153 ],
2154 );
2155 let f2 = test_new_group_type(
2156 "a",
2157 Repetition::REPEATED,
2158 vec![test_new_group_type(
2159 "b",
2160 Repetition::REPEATED,
2161 vec![Type::primitive_type_builder("c", PhysicalType::INT32)
2162 .build()
2163 .unwrap()],
2164 )],
2165 );
2166 assert!(f1.check_contains(&f2)); assert!(!f2.check_contains(&f1)); }
2169
2170 #[test]
2171 fn test_schema_type_thrift_conversion_err() {
2172 let schema = Type::primitive_type_builder("col", PhysicalType::INT32)
2173 .build()
2174 .unwrap();
2175 let schema = Arc::new(schema);
2176 let thrift_schema = schema_to_buf(&schema);
2177 assert!(thrift_schema.is_err());
2178 if let Err(e) = thrift_schema {
2179 assert_eq!(
2180 format!("{e}"),
2181 "Parquet error: Root schema must be Group type"
2182 );
2183 }
2184 }
2185
2186 #[test]
2187 fn test_schema_type_thrift_conversion() {
2188 let message_type = "
2189 message conversions {
2190 REQUIRED INT64 id;
2191 OPTIONAL FIXED_LEN_BYTE_ARRAY (2) f16 (FLOAT16);
2192 OPTIONAL group int_array_Array (LIST) {
2193 REPEATED group list {
2194 OPTIONAL group element (LIST) {
2195 REPEATED group list {
2196 OPTIONAL INT32 element;
2197 }
2198 }
2199 }
2200 }
2201 OPTIONAL group int_map (MAP) {
2202 REPEATED group map (MAP_KEY_VALUE) {
2203 REQUIRED BYTE_ARRAY key (UTF8);
2204 OPTIONAL INT32 value;
2205 }
2206 }
2207 OPTIONAL group int_Map_Array (LIST) {
2208 REPEATED group list {
2209 OPTIONAL group g (MAP) {
2210 REPEATED group map (MAP_KEY_VALUE) {
2211 REQUIRED BYTE_ARRAY key (UTF8);
2212 OPTIONAL group value {
2213 OPTIONAL group H {
2214 OPTIONAL group i (LIST) {
2215 REPEATED group list {
2216 OPTIONAL DOUBLE element;
2217 }
2218 }
2219 }
2220 }
2221 }
2222 }
2223 }
2224 }
2225 OPTIONAL group nested_struct {
2226 OPTIONAL INT32 A;
2227 OPTIONAL group b (LIST) {
2228 REPEATED group list {
2229 REQUIRED FIXED_LEN_BYTE_ARRAY (16) element;
2230 }
2231 }
2232 }
2233 }
2234 ";
2235 let expected_schema = parse_message_type(message_type).unwrap();
2236 let result_schema = roundtrip_schema(Arc::new(expected_schema.clone())).unwrap();
2237 assert_eq!(result_schema, Arc::new(expected_schema));
2238 }
2239
2240 #[test]
2241 fn test_schema_type_thrift_conversion_decimal() {
2242 let message_type = "
2243 message decimals {
2244 OPTIONAL INT32 field0;
2245 OPTIONAL INT64 field1 (DECIMAL (18, 2));
2246 OPTIONAL FIXED_LEN_BYTE_ARRAY (16) field2 (DECIMAL (38, 18));
2247 OPTIONAL BYTE_ARRAY field3 (DECIMAL (9));
2248 }
2249 ";
2250 let expected_schema = parse_message_type(message_type).unwrap();
2251 let result_schema = roundtrip_schema(Arc::new(expected_schema.clone())).unwrap();
2252 assert_eq!(result_schema, Arc::new(expected_schema));
2253 }
2254
2255 #[test]
2258 fn test_schema_from_thrift_with_num_children_set() {
2259 let message_type = "
2261 message schema {
2262 OPTIONAL BYTE_ARRAY id (UTF8);
2263 OPTIONAL BYTE_ARRAY name (UTF8);
2264 OPTIONAL BYTE_ARRAY message (UTF8);
2265 OPTIONAL INT32 type (UINT_8);
2266 OPTIONAL INT64 author_time (TIMESTAMP_MILLIS);
2267 OPTIONAL INT64 __index_level_0__;
2268 }
2269 ";
2270
2271 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2272 let mut buf = schema_to_buf(&expected_schema).unwrap();
2273 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2274
2275 for elem in &mut thrift_schema[..] {
2277 if elem.num_children.is_none() {
2278 elem.num_children = Some(0);
2279 }
2280 }
2281
2282 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2283 assert_eq!(result_schema, expected_schema);
2284 }
2285
2286 #[test]
2289 fn test_schema_from_thrift_root_has_repetition() {
2290 let message_type = "
2292 message schema {
2293 OPTIONAL BYTE_ARRAY a (UTF8);
2294 OPTIONAL INT32 b (UINT_8);
2295 }
2296 ";
2297
2298 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2299 let mut buf = schema_to_buf(&expected_schema).unwrap();
2300 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2301 thrift_schema[0].repetition_type = Some(Repetition::REQUIRED);
2302
2303 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2304 assert_eq!(result_schema, expected_schema);
2305 }
2306
2307 #[test]
2308 fn test_schema_from_thrift_group_has_no_child() {
2309 let message_type = "message schema {}";
2310
2311 let expected_schema = Arc::new(parse_message_type(message_type).unwrap());
2312 let mut buf = schema_to_buf(&expected_schema).unwrap();
2313 let mut thrift_schema = buf_to_schema_list(&mut buf).unwrap();
2314 thrift_schema[0].repetition_type = Some(Repetition::REQUIRED);
2315
2316 let result_schema = parquet_schema_from_array(thrift_schema).unwrap();
2317 assert_eq!(result_schema, expected_schema);
2318 }
2319}