1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32 let info = t.get_basic_info();
33 match info.has_repetition() {
34 true => info.repetition(),
35 false => Repetition::REQUIRED,
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ParquetField {
42 pub rep_level: i16,
45 pub def_level: i16,
49 pub nullable: bool,
51 pub arrow_type: DataType,
56 pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61 fn into_list(self, name: &str) -> Self {
65 ParquetField {
66 rep_level: self.rep_level,
67 def_level: self.def_level,
68 nullable: false,
69 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70 field_type: ParquetFieldType::Group {
71 children: vec![self],
72 },
73 }
74 }
75
76 pub fn children(&self) -> Option<&[Self]> {
78 match &self.field_type {
79 ParquetFieldType::Primitive { .. } => None,
80 ParquetFieldType::Group { children } => Some(children),
81 ParquetFieldType::Virtual(_) => None,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89 RowNumber,
91 RowGroupIndex,
93}
94
95#[derive(Debug, Clone)]
96pub enum ParquetFieldType {
97 Primitive {
98 col_idx: usize,
100 primitive_type: TypePtr,
102 },
103 Group {
104 children: Vec<ParquetField>,
105 },
106 Virtual(VirtualColumnType),
109}
110
111struct VisitorContext {
113 rep_level: i16,
114 def_level: i16,
115 data_type: Option<DataType>,
117}
118
119impl VisitorContext {
120 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
123 match repetition {
124 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
125 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
126 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
127 }
128 }
129}
130
131struct Visitor {
137 next_col_idx: usize,
139
140 mask: ProjectionMask,
142}
143
144impl Visitor {
145 fn visit_primitive(
146 &mut self,
147 primitive_type: &TypePtr,
148 context: VisitorContext,
149 ) -> Result<Option<ParquetField>> {
150 let col_idx = self.next_col_idx;
151 self.next_col_idx += 1;
152
153 if !self.mask.leaf_included(col_idx) {
154 return Ok(None);
155 }
156
157 let repetition = get_repetition(primitive_type);
158 let (def_level, rep_level, nullable) = context.levels(repetition);
159
160 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
161
162 let primitive_field = ParquetField {
163 rep_level,
164 def_level,
165 nullable,
166 arrow_type,
167 field_type: ParquetFieldType::Primitive {
168 primitive_type: primitive_type.clone(),
169 col_idx,
170 },
171 };
172
173 Ok(Some(match repetition {
174 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
175 _ => primitive_field,
176 }))
177 }
178
179 fn visit_struct(
180 &mut self,
181 struct_type: &TypePtr,
182 context: VisitorContext,
183 ) -> Result<Option<ParquetField>> {
184 let repetition = get_repetition(struct_type);
186 let (def_level, rep_level, nullable) = context.levels(repetition);
187
188 let parquet_fields = struct_type.get_fields();
189
190 let arrow_fields = match &context.data_type {
192 Some(DataType::Struct(fields)) => {
193 if fields.len() != parquet_fields.len() {
194 return Err(arrow_err!(
195 "incompatible arrow schema, expected {} struct fields got {}",
196 parquet_fields.len(),
197 fields.len()
198 ));
199 }
200 Some(fields)
201 }
202 Some(d) => {
203 return Err(arrow_err!(
204 "incompatible arrow schema, expected struct got {}",
205 d
206 ));
207 }
208 None => None,
209 };
210
211 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
212 let mut children = Vec::with_capacity(parquet_fields.len());
213
214 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
216 let data_type = match arrow_fields {
217 Some(fields) => {
218 let field = &fields[idx];
219 if field.name() != parquet_field.name() {
220 return Err(arrow_err!(
221 "incompatible arrow schema, expected field named {} got {}",
222 parquet_field.name(),
223 field.name()
224 ));
225 }
226 Some(field.data_type().clone())
227 }
228 None => None,
229 };
230
231 let arrow_field = arrow_fields.map(|x| &*x[idx]);
232 let child_ctx = VisitorContext {
233 rep_level,
234 def_level,
235 data_type,
236 };
237
238 if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
239 child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
242 children.push(child);
243 }
244 }
245
246 if children.is_empty() {
247 return Ok(None);
248 }
249
250 let struct_field = ParquetField {
251 rep_level,
252 def_level,
253 nullable,
254 arrow_type: DataType::Struct(child_fields.finish().fields),
255 field_type: ParquetFieldType::Group { children },
256 };
257
258 Ok(Some(match repetition {
259 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
260 _ => struct_field,
261 }))
262 }
263
264 fn visit_map(
265 &mut self,
266 map_type: &TypePtr,
267 context: VisitorContext,
268 ) -> Result<Option<ParquetField>> {
269 let rep_level = context.rep_level + 1;
270 let (def_level, nullable) = match get_repetition(map_type) {
271 Repetition::REQUIRED => (context.def_level + 1, false),
272 Repetition::OPTIONAL => (context.def_level + 2, true),
273 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
274 };
275
276 if map_type.get_fields().len() != 1 {
277 return Err(arrow_err!(
278 "Map field must have exactly one key_value child, found {}",
279 map_type.get_fields().len()
280 ));
281 }
282
283 let map_key_value = &map_type.get_fields()[0];
285 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
286 return Err(arrow_err!("Child of map field must be repeated"));
287 }
288
289 if map_key_value.get_fields().len() == 1 {
292 return self.visit_list(map_type, context);
293 }
294
295 if map_key_value.get_fields().len() != 2 {
296 return Err(arrow_err!(
297 "Child of map field must have two children, found {}",
298 map_key_value.get_fields().len()
299 ));
300 }
301
302 let map_key = &map_key_value.get_fields()[0];
304 let map_value = &map_key_value.get_fields()[1];
305
306 match map_key.get_basic_info().repetition() {
307 Repetition::REPEATED => {
308 return Err(arrow_err!("Map keys cannot be repeated"));
309 }
310 Repetition::REQUIRED | Repetition::OPTIONAL => {
311 }
316 }
317
318 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
319 return Err(arrow_err!("Map values cannot be repeated"));
320 }
321
322 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
324 Some(DataType::Map(field, sorted)) => match field.data_type() {
325 DataType::Struct(fields) => {
326 if fields.len() != 2 {
327 return Err(arrow_err!(
328 "Map data type should contain struct with two children, got {}",
329 fields.len()
330 ));
331 }
332
333 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
334 }
335 d => {
336 return Err(arrow_err!("Map data type should contain struct got {}", d));
337 }
338 },
339 Some(d) => {
340 return Err(arrow_err!(
341 "incompatible arrow schema, expected map got {}",
342 d
343 ));
344 }
345 None => (None, None, None, false),
346 };
347
348 let maybe_key = {
349 let context = VisitorContext {
350 rep_level,
351 def_level,
352 data_type: arrow_key.map(|x| x.data_type().clone()),
353 };
354
355 self.dispatch(map_key, context)?
356 };
357
358 let maybe_value = {
359 let context = VisitorContext {
360 rep_level,
361 def_level,
362 data_type: arrow_value.map(|x| x.data_type().clone()),
363 };
364
365 self.dispatch(map_value, context)?
366 };
367
368 match (maybe_key, maybe_value) {
370 (Some(mut key), Some(mut value)) => {
371 let key_field = Arc::new(
372 convert_field(map_key, &mut key, arrow_key)?
373 .with_nullable(false),
375 );
376 let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
377 let field_metadata = match arrow_map {
378 Some(field) => field.metadata().clone(),
379 _ => HashMap::default(),
380 };
381
382 let map_field = Field::new_struct(
383 map_key_value.name(),
384 [key_field, value_field],
385 false, )
387 .with_metadata(field_metadata);
388
389 Ok(Some(ParquetField {
390 rep_level,
391 def_level,
392 nullable,
393 arrow_type: DataType::Map(Arc::new(map_field), sorted),
394 field_type: ParquetFieldType::Group {
395 children: vec![key, value],
396 },
397 }))
398 }
399 _ => Ok(None),
400 }
401 }
402
403 fn visit_list(
404 &mut self,
405 list_type: &TypePtr,
406 context: VisitorContext,
407 ) -> Result<Option<ParquetField>> {
408 if list_type.is_primitive() {
409 return Err(arrow_err!(
410 "{:?} is a list type and can't be processed as primitive.",
411 list_type
412 ));
413 }
414
415 let fields = list_type.get_fields();
416 if fields.len() != 1 {
417 return Err(arrow_err!(
418 "list type must have a single child, found {}",
419 fields.len()
420 ));
421 }
422
423 let repeated_field = &fields[0];
424 if get_repetition(repeated_field) != Repetition::REPEATED {
425 return Err(arrow_err!("List child must be repeated"));
426 }
427
428 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
430 Repetition::REQUIRED => (context.def_level, false),
431 Repetition::OPTIONAL => (context.def_level + 1, true),
432 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
433 };
434
435 let arrow_field = match &context.data_type {
436 Some(DataType::List(f)) => Some(f.as_ref()),
437 Some(DataType::LargeList(f)) => Some(f.as_ref()),
438 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
439 Some(d) => {
440 return Err(arrow_err!(
441 "incompatible arrow schema, expected list got {}",
442 d
443 ));
444 }
445 None => None,
446 };
447
448 if repeated_field.is_primitive() {
449 let context = VisitorContext {
456 rep_level: context.rep_level,
457 def_level,
458 data_type: arrow_field.map(|f| f.data_type().clone()),
459 };
460
461 return match self.visit_primitive(repeated_field, context) {
462 Ok(Some(mut field)) => {
463 field.nullable = nullable;
465 Ok(Some(field))
466 }
467 r => r,
468 };
469 }
470
471 let items = repeated_field.get_fields();
473 if items.len() != 1
474 || (!repeated_field.is_list()
475 && !repeated_field.has_single_repeated_child()
476 && (repeated_field.name() == "array"
477 || repeated_field.name() == format!("{}_tuple", list_type.name())))
478 {
479 let context = VisitorContext {
487 rep_level: context.rep_level,
488 def_level,
489 data_type: arrow_field.map(|f| f.data_type().clone()),
490 };
491
492 return match self.visit_struct(repeated_field, context) {
493 Ok(Some(mut field)) => {
494 field.nullable = nullable;
495 Ok(Some(field))
496 }
497 r => r,
498 };
499 }
500
501 let item_type = &items[0];
503 let rep_level = context.rep_level + 1;
504 let def_level = def_level + 1;
505
506 let new_context = VisitorContext {
507 def_level,
508 rep_level,
509 data_type: arrow_field.map(|f| f.data_type().clone()),
510 };
511
512 match self.dispatch(item_type, new_context) {
513 Ok(Some(mut item)) => {
514 let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
515
516 let arrow_type = match context.data_type {
518 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
519 Some(DataType::FixedSizeList(_, len)) => {
520 DataType::FixedSizeList(item_field, len)
521 }
522 _ => DataType::List(item_field),
523 };
524
525 Ok(Some(ParquetField {
526 rep_level,
527 def_level,
528 nullable,
529 arrow_type,
530 field_type: ParquetFieldType::Group {
531 children: vec![item],
532 },
533 }))
534 }
535 r => r,
536 }
537 }
538
539 fn dispatch(
540 &mut self,
541 cur_type: &TypePtr,
542 context: VisitorContext,
543 ) -> Result<Option<ParquetField>> {
544 if cur_type.is_primitive() {
545 self.visit_primitive(cur_type, context)
546 } else {
547 match cur_type.get_basic_info().converted_type() {
548 ConvertedType::LIST => self.visit_list(cur_type, context),
549 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
550 self.visit_map(cur_type, context)
551 }
552 _ => self.visit_struct(cur_type, context),
553 }
554 }
555 }
556}
557
558pub(super) fn convert_virtual_field(
568 arrow_field: &Field,
569 parent_rep_level: i16,
570 parent_def_level: i16,
571) -> Result<ParquetField> {
572 let nullable = arrow_field.is_nullable();
573 let def_level = if nullable {
574 parent_def_level + 1
575 } else {
576 parent_def_level
577 };
578
579 let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
581 ParquetError::ArrowError(format!(
582 "virtual column field '{}' must have an extension type",
583 arrow_field.name()
584 ))
585 })?;
586
587 let virtual_type = match extension_name {
588 RowNumber::NAME => VirtualColumnType::RowNumber,
589 RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
590 _ => {
591 return Err(ParquetError::ArrowError(format!(
592 "unsupported virtual column type '{}' for field '{}'",
593 extension_name,
594 arrow_field.name()
595 )));
596 }
597 };
598
599 Ok(ParquetField {
600 rep_level: parent_rep_level,
601 def_level,
602 nullable,
603 arrow_type: arrow_field.data_type().clone(),
604 field_type: ParquetFieldType::Virtual(virtual_type),
605 })
606}
607
608fn convert_field(
613 parquet_type: &Type,
614 field: &mut ParquetField,
615 arrow_hint: Option<&Field>,
616) -> Result<Field, ParquetError> {
617 let name = parquet_type.name();
618 let data_type = field.arrow_type.clone();
619 let nullable = field.nullable;
620
621 match arrow_hint {
622 Some(hint) => {
623 #[allow(deprecated)]
625 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
626 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
627 {
628 #[allow(deprecated)]
629 Field::new_dict(name, data_type, nullable, id, ordered)
630 }
631 _ => Field::new(name, data_type, nullable),
632 };
633
634 Ok(field.with_metadata(hint.metadata().clone()))
635 }
636 None => {
637 let mut ret = Field::new(name, data_type, nullable);
638 let basic_info = parquet_type.get_basic_info();
639 if basic_info.has_id() {
640 let mut meta = HashMap::with_capacity(1);
641 meta.insert(
642 PARQUET_FIELD_ID_META_KEY.to_string(),
643 basic_info.id().to_string(),
644 );
645 ret.set_metadata(meta);
646 }
647 try_add_extension_type(ret, parquet_type)
648 }
649 }
650}
651
652pub fn convert_schema(
658 schema: &SchemaDescriptor,
659 mask: ProjectionMask,
660 embedded_arrow_schema: Option<&Fields>,
661) -> Result<Option<ParquetField>> {
662 let mut visitor = Visitor {
663 next_col_idx: 0,
664 mask,
665 };
666
667 let context = VisitorContext {
668 rep_level: 0,
669 def_level: 0,
670 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
671 };
672
673 visitor.dispatch(&schema.root_schema_ptr(), context)
674}
675
676pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
678 let mut visitor = Visitor {
679 next_col_idx: 0,
680 mask: ProjectionMask::all(),
681 };
682
683 let context = VisitorContext {
684 rep_level: 0,
685 def_level: 0,
686 data_type: None,
687 };
688
689 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
690}