1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::RowNumber;
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32 let info = t.get_basic_info();
33 match info.has_repetition() {
34 true => info.repetition(),
35 false => Repetition::REQUIRED,
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ParquetField {
42 pub rep_level: i16,
45 pub def_level: i16,
49 pub nullable: bool,
51 pub arrow_type: DataType,
56 pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61 fn into_list(self, name: &str) -> Self {
65 ParquetField {
66 rep_level: self.rep_level,
67 def_level: self.def_level,
68 nullable: false,
69 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70 field_type: ParquetFieldType::Group {
71 children: vec![self],
72 },
73 }
74 }
75
76 pub fn children(&self) -> Option<&[Self]> {
78 match &self.field_type {
79 ParquetFieldType::Primitive { .. } => None,
80 ParquetFieldType::Group { children } => Some(children),
81 ParquetFieldType::Virtual(_) => None,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89 RowNumber,
91}
92
93#[derive(Debug, Clone)]
94pub enum ParquetFieldType {
95 Primitive {
96 col_idx: usize,
98 primitive_type: TypePtr,
100 },
101 Group {
102 children: Vec<ParquetField>,
103 },
104 Virtual(VirtualColumnType),
107}
108
109struct VisitorContext {
111 rep_level: i16,
112 def_level: i16,
113 data_type: Option<DataType>,
115}
116
117impl VisitorContext {
118 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
121 match repetition {
122 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
123 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
124 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
125 }
126 }
127}
128
129struct Visitor {
135 next_col_idx: usize,
137
138 mask: ProjectionMask,
140}
141
142impl Visitor {
143 fn visit_primitive(
144 &mut self,
145 primitive_type: &TypePtr,
146 context: VisitorContext,
147 ) -> Result<Option<ParquetField>> {
148 let col_idx = self.next_col_idx;
149 self.next_col_idx += 1;
150
151 if !self.mask.leaf_included(col_idx) {
152 return Ok(None);
153 }
154
155 let repetition = get_repetition(primitive_type);
156 let (def_level, rep_level, nullable) = context.levels(repetition);
157
158 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
159
160 let primitive_field = ParquetField {
161 rep_level,
162 def_level,
163 nullable,
164 arrow_type,
165 field_type: ParquetFieldType::Primitive {
166 primitive_type: primitive_type.clone(),
167 col_idx,
168 },
169 };
170
171 Ok(Some(match repetition {
172 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
173 _ => primitive_field,
174 }))
175 }
176
177 fn visit_struct(
178 &mut self,
179 struct_type: &TypePtr,
180 context: VisitorContext,
181 ) -> Result<Option<ParquetField>> {
182 let repetition = get_repetition(struct_type);
184 let (def_level, rep_level, nullable) = context.levels(repetition);
185
186 let parquet_fields = struct_type.get_fields();
187
188 let arrow_fields = match &context.data_type {
190 Some(DataType::Struct(fields)) => {
191 if fields.len() != parquet_fields.len() {
192 return Err(arrow_err!(
193 "incompatible arrow schema, expected {} struct fields got {}",
194 parquet_fields.len(),
195 fields.len()
196 ));
197 }
198 Some(fields)
199 }
200 Some(d) => {
201 return Err(arrow_err!(
202 "incompatible arrow schema, expected struct got {}",
203 d
204 ));
205 }
206 None => None,
207 };
208
209 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
210 let mut children = Vec::with_capacity(parquet_fields.len());
211
212 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
214 let data_type = match arrow_fields {
215 Some(fields) => {
216 let field = &fields[idx];
217 if field.name() != parquet_field.name() {
218 return Err(arrow_err!(
219 "incompatible arrow schema, expected field named {} got {}",
220 parquet_field.name(),
221 field.name()
222 ));
223 }
224 Some(field.data_type().clone())
225 }
226 None => None,
227 };
228
229 let arrow_field = arrow_fields.map(|x| &*x[idx]);
230 let child_ctx = VisitorContext {
231 rep_level,
232 def_level,
233 data_type,
234 };
235
236 if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
237 child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
240 children.push(child);
241 }
242 }
243
244 if children.is_empty() {
245 return Ok(None);
246 }
247
248 let struct_field = ParquetField {
249 rep_level,
250 def_level,
251 nullable,
252 arrow_type: DataType::Struct(child_fields.finish().fields),
253 field_type: ParquetFieldType::Group { children },
254 };
255
256 Ok(Some(match repetition {
257 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
258 _ => struct_field,
259 }))
260 }
261
262 fn visit_map(
263 &mut self,
264 map_type: &TypePtr,
265 context: VisitorContext,
266 ) -> Result<Option<ParquetField>> {
267 let rep_level = context.rep_level + 1;
268 let (def_level, nullable) = match get_repetition(map_type) {
269 Repetition::REQUIRED => (context.def_level + 1, false),
270 Repetition::OPTIONAL => (context.def_level + 2, true),
271 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
272 };
273
274 if map_type.get_fields().len() != 1 {
275 return Err(arrow_err!(
276 "Map field must have exactly one key_value child, found {}",
277 map_type.get_fields().len()
278 ));
279 }
280
281 let map_key_value = &map_type.get_fields()[0];
283 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
284 return Err(arrow_err!("Child of map field must be repeated"));
285 }
286
287 if map_key_value.get_fields().len() == 1 {
290 return self.visit_list(map_type, context);
291 }
292
293 if map_key_value.get_fields().len() != 2 {
294 return Err(arrow_err!(
295 "Child of map field must have two children, found {}",
296 map_key_value.get_fields().len()
297 ));
298 }
299
300 let map_key = &map_key_value.get_fields()[0];
302 let map_value = &map_key_value.get_fields()[1];
303
304 match map_key.get_basic_info().repetition() {
305 Repetition::REPEATED => {
306 return Err(arrow_err!("Map keys cannot be repeated"));
307 }
308 Repetition::REQUIRED | Repetition::OPTIONAL => {
309 }
314 }
315
316 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
317 return Err(arrow_err!("Map values cannot be repeated"));
318 }
319
320 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
322 Some(DataType::Map(field, sorted)) => match field.data_type() {
323 DataType::Struct(fields) => {
324 if fields.len() != 2 {
325 return Err(arrow_err!(
326 "Map data type should contain struct with two children, got {}",
327 fields.len()
328 ));
329 }
330
331 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
332 }
333 d => {
334 return Err(arrow_err!("Map data type should contain struct got {}", d));
335 }
336 },
337 Some(d) => {
338 return Err(arrow_err!(
339 "incompatible arrow schema, expected map got {}",
340 d
341 ));
342 }
343 None => (None, None, None, false),
344 };
345
346 let maybe_key = {
347 let context = VisitorContext {
348 rep_level,
349 def_level,
350 data_type: arrow_key.map(|x| x.data_type().clone()),
351 };
352
353 self.dispatch(map_key, context)?
354 };
355
356 let maybe_value = {
357 let context = VisitorContext {
358 rep_level,
359 def_level,
360 data_type: arrow_value.map(|x| x.data_type().clone()),
361 };
362
363 self.dispatch(map_value, context)?
364 };
365
366 match (maybe_key, maybe_value) {
368 (Some(mut key), Some(mut value)) => {
369 let key_field = Arc::new(
370 convert_field(map_key, &mut key, arrow_key)?
371 .with_nullable(false),
373 );
374 let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
375 let field_metadata = match arrow_map {
376 Some(field) => field.metadata().clone(),
377 _ => HashMap::default(),
378 };
379
380 let map_field = Field::new_struct(
381 map_key_value.name(),
382 [key_field, value_field],
383 false, )
385 .with_metadata(field_metadata);
386
387 Ok(Some(ParquetField {
388 rep_level,
389 def_level,
390 nullable,
391 arrow_type: DataType::Map(Arc::new(map_field), sorted),
392 field_type: ParquetFieldType::Group {
393 children: vec![key, value],
394 },
395 }))
396 }
397 _ => Ok(None),
398 }
399 }
400
401 fn visit_list(
402 &mut self,
403 list_type: &TypePtr,
404 context: VisitorContext,
405 ) -> Result<Option<ParquetField>> {
406 if list_type.is_primitive() {
407 return Err(arrow_err!(
408 "{:?} is a list type and can't be processed as primitive.",
409 list_type
410 ));
411 }
412
413 let fields = list_type.get_fields();
414 if fields.len() != 1 {
415 return Err(arrow_err!(
416 "list type must have a single child, found {}",
417 fields.len()
418 ));
419 }
420
421 let repeated_field = &fields[0];
422 if get_repetition(repeated_field) != Repetition::REPEATED {
423 return Err(arrow_err!("List child must be repeated"));
424 }
425
426 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
428 Repetition::REQUIRED => (context.def_level, false),
429 Repetition::OPTIONAL => (context.def_level + 1, true),
430 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
431 };
432
433 let arrow_field = match &context.data_type {
434 Some(DataType::List(f)) => Some(f.as_ref()),
435 Some(DataType::LargeList(f)) => Some(f.as_ref()),
436 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
437 Some(d) => {
438 return Err(arrow_err!(
439 "incompatible arrow schema, expected list got {}",
440 d
441 ));
442 }
443 None => None,
444 };
445
446 if repeated_field.is_primitive() {
447 let context = VisitorContext {
454 rep_level: context.rep_level,
455 def_level,
456 data_type: arrow_field.map(|f| f.data_type().clone()),
457 };
458
459 return match self.visit_primitive(repeated_field, context) {
460 Ok(Some(mut field)) => {
461 field.nullable = nullable;
463 Ok(Some(field))
464 }
465 r => r,
466 };
467 }
468
469 let items = repeated_field.get_fields();
471 if items.len() != 1
472 || (!repeated_field.is_list()
473 && !repeated_field.has_single_repeated_child()
474 && (repeated_field.name() == "array"
475 || repeated_field.name() == format!("{}_tuple", list_type.name())))
476 {
477 let context = VisitorContext {
485 rep_level: context.rep_level,
486 def_level,
487 data_type: arrow_field.map(|f| f.data_type().clone()),
488 };
489
490 return match self.visit_struct(repeated_field, context) {
491 Ok(Some(mut field)) => {
492 field.nullable = nullable;
493 Ok(Some(field))
494 }
495 r => r,
496 };
497 }
498
499 let item_type = &items[0];
501 let rep_level = context.rep_level + 1;
502 let def_level = def_level + 1;
503
504 let new_context = VisitorContext {
505 def_level,
506 rep_level,
507 data_type: arrow_field.map(|f| f.data_type().clone()),
508 };
509
510 match self.dispatch(item_type, new_context) {
511 Ok(Some(mut item)) => {
512 let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
513
514 let arrow_type = match context.data_type {
516 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
517 Some(DataType::FixedSizeList(_, len)) => {
518 DataType::FixedSizeList(item_field, len)
519 }
520 _ => DataType::List(item_field),
521 };
522
523 Ok(Some(ParquetField {
524 rep_level,
525 def_level,
526 nullable,
527 arrow_type,
528 field_type: ParquetFieldType::Group {
529 children: vec![item],
530 },
531 }))
532 }
533 r => r,
534 }
535 }
536
537 fn dispatch(
538 &mut self,
539 cur_type: &TypePtr,
540 context: VisitorContext,
541 ) -> Result<Option<ParquetField>> {
542 if cur_type.is_primitive() {
543 self.visit_primitive(cur_type, context)
544 } else {
545 match cur_type.get_basic_info().converted_type() {
546 ConvertedType::LIST => self.visit_list(cur_type, context),
547 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
548 self.visit_map(cur_type, context)
549 }
550 _ => self.visit_struct(cur_type, context),
551 }
552 }
553 }
554}
555
556pub(super) fn convert_virtual_field(
566 arrow_field: &Field,
567 parent_rep_level: i16,
568 parent_def_level: i16,
569) -> Result<ParquetField> {
570 let nullable = arrow_field.is_nullable();
571 let def_level = if nullable {
572 parent_def_level + 1
573 } else {
574 parent_def_level
575 };
576
577 let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
579 ParquetError::ArrowError(format!(
580 "virtual column field '{}' must have an extension type",
581 arrow_field.name()
582 ))
583 })?;
584
585 let virtual_type = match extension_name {
586 RowNumber::NAME => VirtualColumnType::RowNumber,
587 _ => {
588 return Err(ParquetError::ArrowError(format!(
589 "unsupported virtual column type '{}' for field '{}'",
590 extension_name,
591 arrow_field.name()
592 )));
593 }
594 };
595
596 Ok(ParquetField {
597 rep_level: parent_rep_level,
598 def_level,
599 nullable,
600 arrow_type: arrow_field.data_type().clone(),
601 field_type: ParquetFieldType::Virtual(virtual_type),
602 })
603}
604
605fn convert_field(
610 parquet_type: &Type,
611 field: &mut ParquetField,
612 arrow_hint: Option<&Field>,
613) -> Result<Field, ParquetError> {
614 let name = parquet_type.name();
615 let data_type = field.arrow_type.clone();
616 let nullable = field.nullable;
617
618 match arrow_hint {
619 Some(hint) => {
620 #[allow(deprecated)]
622 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
623 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
624 {
625 #[allow(deprecated)]
626 Field::new_dict(name, data_type, nullable, id, ordered)
627 }
628 _ => Field::new(name, data_type, nullable),
629 };
630
631 Ok(field.with_metadata(hint.metadata().clone()))
632 }
633 None => {
634 let mut ret = Field::new(name, data_type, nullable);
635 let basic_info = parquet_type.get_basic_info();
636 if basic_info.has_id() {
637 let mut meta = HashMap::with_capacity(1);
638 meta.insert(
639 PARQUET_FIELD_ID_META_KEY.to_string(),
640 basic_info.id().to_string(),
641 );
642 ret.set_metadata(meta);
643 }
644 try_add_extension_type(ret, parquet_type)
645 }
646 }
647}
648
649pub fn convert_schema(
655 schema: &SchemaDescriptor,
656 mask: ProjectionMask,
657 embedded_arrow_schema: Option<&Fields>,
658) -> Result<Option<ParquetField>> {
659 let mut visitor = Visitor {
660 next_col_idx: 0,
661 mask,
662 };
663
664 let context = VisitorContext {
665 rep_level: 0,
666 def_level: 0,
667 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
668 };
669
670 visitor.dispatch(&schema.root_schema_ptr(), context)
671}
672
673pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
675 let mut visitor = Visitor {
676 next_col_idx: 0,
677 mask: ProjectionMask::all(),
678 };
679
680 let context = VisitorContext {
681 rep_level: 0,
682 def_level: 0,
683 data_type: None,
684 };
685
686 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
687}