1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32 let info = t.get_basic_info();
33 match info.has_repetition() {
34 true => info.repetition(),
35 false => Repetition::REQUIRED,
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ParquetField {
42 pub rep_level: i16,
45 pub def_level: i16,
49 pub nullable: bool,
51 pub arrow_type: DataType,
56 pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61 fn into_list(self, name: &str) -> Self {
65 ParquetField {
66 rep_level: self.rep_level,
67 def_level: self.def_level,
68 nullable: false,
69 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70 field_type: ParquetFieldType::Group {
71 children: vec![self],
72 },
73 }
74 }
75
76 pub fn children(&self) -> Option<&[Self]> {
78 match &self.field_type {
79 ParquetFieldType::Primitive { .. } => None,
80 ParquetFieldType::Group { children } => Some(children),
81 ParquetFieldType::Virtual(_) => None,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89 RowNumber,
91 RowGroupIndex,
93}
94
95#[derive(Debug, Clone)]
96pub enum ParquetFieldType {
97 Primitive {
98 col_idx: usize,
100 primitive_type: TypePtr,
102 },
103 Group {
104 children: Vec<ParquetField>,
105 },
106 Virtual(VirtualColumnType),
109}
110
111struct VisitorContext {
113 rep_level: i16,
114 def_level: i16,
115 data_type: Option<DataType>,
117}
118
119impl VisitorContext {
120 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
123 match repetition {
124 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
125 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
126 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
127 }
128 }
129}
130
131struct Visitor {
137 next_col_idx: usize,
139
140 mask: ProjectionMask,
142}
143
144impl Visitor {
145 fn visit_primitive(
146 &mut self,
147 primitive_type: &TypePtr,
148 context: VisitorContext,
149 ) -> Result<Option<ParquetField>> {
150 let col_idx = self.next_col_idx;
151 self.next_col_idx += 1;
152
153 if !self.mask.leaf_included(col_idx) {
154 return Ok(None);
155 }
156
157 let repetition = get_repetition(primitive_type);
158 let (def_level, rep_level, nullable) = context.levels(repetition);
159
160 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
161
162 let primitive_field = ParquetField {
163 rep_level,
164 def_level,
165 nullable,
166 arrow_type,
167 field_type: ParquetFieldType::Primitive {
168 primitive_type: primitive_type.clone(),
169 col_idx,
170 },
171 };
172
173 Ok(Some(match repetition {
174 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
175 _ => primitive_field,
176 }))
177 }
178
179 fn visit_struct(
180 &mut self,
181 struct_type: &TypePtr,
182 context: VisitorContext,
183 ) -> Result<Option<ParquetField>> {
184 let repetition = get_repetition(struct_type);
186 let (def_level, rep_level, nullable) = context.levels(repetition);
187
188 let parquet_fields = struct_type.get_fields();
189
190 let arrow_fields = match &context.data_type {
192 Some(DataType::Struct(fields)) => {
193 if fields.len() != parquet_fields.len() {
194 return Err(arrow_err!(
195 "incompatible arrow schema, expected {} struct fields got {}",
196 parquet_fields.len(),
197 fields.len()
198 ));
199 }
200 Some(fields)
201 }
202 Some(d) => {
203 return Err(arrow_err!(
204 "incompatible arrow schema, expected struct got {}",
205 d
206 ));
207 }
208 None => None,
209 };
210
211 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
212 let mut children = Vec::with_capacity(parquet_fields.len());
213
214 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
216 let data_type = match arrow_fields {
217 Some(fields) => {
218 let field = &fields[idx];
219 if field.name() != parquet_field.name() {
220 return Err(arrow_err!(
221 "incompatible arrow schema, expected field named {} got {}",
222 parquet_field.name(),
223 field.name()
224 ));
225 }
226 Some(field.data_type().clone())
227 }
228 None => None,
229 };
230
231 let arrow_field = arrow_fields.map(|x| &*x[idx]);
232 let child_ctx = VisitorContext {
233 rep_level,
234 def_level,
235 data_type,
236 };
237
238 if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
239 child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
242 children.push(child);
243 }
244 }
245
246 if children.is_empty() {
247 return Ok(None);
248 }
249
250 let struct_field = ParquetField {
251 rep_level,
252 def_level,
253 nullable,
254 arrow_type: DataType::Struct(child_fields.finish().fields),
255 field_type: ParquetFieldType::Group { children },
256 };
257
258 Ok(Some(match repetition {
259 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
260 _ => struct_field,
261 }))
262 }
263
264 fn visit_map(
265 &mut self,
266 map_type: &TypePtr,
267 context: VisitorContext,
268 ) -> Result<Option<ParquetField>> {
269 let rep_level = context.rep_level + 1;
270 let (def_level, nullable) = match get_repetition(map_type) {
271 Repetition::REQUIRED => (context.def_level + 1, false),
272 Repetition::OPTIONAL => (context.def_level + 2, true),
273 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
274 };
275
276 if map_type.get_fields().len() != 1 {
277 return Err(arrow_err!(
278 "Map field must have exactly one key_value child, found {}",
279 map_type.get_fields().len()
280 ));
281 }
282
283 let map_key_value = &map_type.get_fields()[0];
285 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
286 return Err(arrow_err!("Child of map field must be repeated"));
287 }
288
289 if map_key_value.get_fields().len() == 1 {
292 return self.visit_list(map_type, context);
293 }
294
295 if map_key_value.get_fields().len() != 2 {
296 return Err(arrow_err!(
297 "Child of map field must have two children, found {}",
298 map_key_value.get_fields().len()
299 ));
300 }
301
302 let map_key = &map_key_value.get_fields()[0];
304 let map_value = &map_key_value.get_fields()[1];
305
306 match map_key.get_basic_info().repetition() {
307 Repetition::REPEATED => {
308 return Err(arrow_err!("Map keys cannot be repeated"));
309 }
310 Repetition::REQUIRED | Repetition::OPTIONAL => {
311 }
316 }
317
318 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
319 return Err(arrow_err!("Map values cannot be repeated"));
320 }
321
322 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
324 Some(DataType::Map(field, sorted)) => match field.data_type() {
325 DataType::Struct(fields) => {
326 if fields.len() != 2 {
327 return Err(arrow_err!(
328 "Map data type should contain struct with two children, got {}",
329 fields.len()
330 ));
331 }
332
333 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
334 }
335 d => {
336 return Err(arrow_err!("Map data type should contain struct got {}", d));
337 }
338 },
339 Some(d) => {
340 return Err(arrow_err!(
341 "incompatible arrow schema, expected map got {}",
342 d
343 ));
344 }
345 None => (None, None, None, false),
346 };
347
348 let maybe_key = {
349 let context = VisitorContext {
350 rep_level,
351 def_level,
352 data_type: arrow_key.map(|x| x.data_type().clone()),
353 };
354
355 self.dispatch(map_key, context)?
356 };
357
358 let maybe_value = {
359 let context = VisitorContext {
360 rep_level,
361 def_level,
362 data_type: arrow_value.map(|x| x.data_type().clone()),
363 };
364
365 self.dispatch(map_value, context)?
366 };
367
368 match (maybe_key, maybe_value) {
370 (Some(mut key), Some(mut value)) => {
371 let key_field = Arc::new(
372 convert_field(map_key, &mut key, arrow_key)?
373 .with_nullable(false),
375 );
376 let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
377 let field_metadata = match arrow_map {
378 Some(field) => field.metadata().clone(),
379 _ => HashMap::default(),
380 };
381
382 let map_field = Field::new_struct(
383 map_key_value.name(),
384 [key_field, value_field],
385 false, )
387 .with_metadata(field_metadata);
388
389 Ok(Some(ParquetField {
390 rep_level,
391 def_level,
392 nullable,
393 arrow_type: DataType::Map(Arc::new(map_field), sorted),
394 field_type: ParquetFieldType::Group {
395 children: vec![key, value],
396 },
397 }))
398 }
399 _ => Ok(None),
400 }
401 }
402
403 fn visit_list(
404 &mut self,
405 list_type: &TypePtr,
406 context: VisitorContext,
407 ) -> Result<Option<ParquetField>> {
408 if list_type.is_primitive() {
409 return Err(arrow_err!(
410 "{:?} is a list type and can't be processed as primitive.",
411 list_type
412 ));
413 }
414
415 let fields = list_type.get_fields();
416 if fields.len() != 1 {
417 return Err(arrow_err!(
418 "list type must have a single child, found {}",
419 fields.len()
420 ));
421 }
422
423 let repeated_field = &fields[0];
424 if get_repetition(repeated_field) != Repetition::REPEATED {
425 return Err(arrow_err!("List child must be repeated"));
426 }
427
428 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
430 Repetition::REQUIRED => (context.def_level, false),
431 Repetition::OPTIONAL => (context.def_level + 1, true),
432 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
433 };
434
435 let arrow_field = match &context.data_type {
436 Some(DataType::List(f)) => Some(f.as_ref()),
437 Some(DataType::LargeList(f)) => Some(f.as_ref()),
438 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
439 Some(DataType::ListView(f)) => Some(f.as_ref()),
440 Some(DataType::LargeListView(f)) => Some(f.as_ref()),
441 Some(d) => {
442 return Err(arrow_err!(
443 "incompatible arrow schema, expected list got {}",
444 d
445 ));
446 }
447 None => None,
448 };
449
450 if repeated_field.is_primitive() {
451 let context = VisitorContext {
458 rep_level: context.rep_level,
459 def_level,
460 data_type: arrow_field.map(|f| f.data_type().clone()),
461 };
462
463 return match self.visit_primitive(repeated_field, context) {
464 Ok(Some(mut field)) => {
465 field.nullable = nullable;
467 Ok(Some(field))
468 }
469 r => r,
470 };
471 }
472
473 let items = repeated_field.get_fields();
475 if items.len() != 1
476 || (!repeated_field.is_list()
477 && !repeated_field.has_single_repeated_child()
478 && (repeated_field.name() == "array"
479 || repeated_field.name() == format!("{}_tuple", list_type.name())))
480 {
481 let context = VisitorContext {
489 rep_level: context.rep_level,
490 def_level,
491 data_type: arrow_field.map(|f| f.data_type().clone()),
492 };
493
494 return match self.visit_struct(repeated_field, context) {
495 Ok(Some(mut field)) => {
496 field.nullable = nullable;
497 Ok(Some(field))
498 }
499 r => r,
500 };
501 }
502
503 let item_type = &items[0];
505 let rep_level = context.rep_level + 1;
506 let def_level = def_level + 1;
507
508 let new_context = VisitorContext {
509 def_level,
510 rep_level,
511 data_type: arrow_field.map(|f| f.data_type().clone()),
512 };
513
514 match self.dispatch(item_type, new_context) {
515 Ok(Some(mut item)) => {
516 let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
517
518 let arrow_type = match context.data_type {
520 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
521 Some(DataType::FixedSizeList(_, len)) => {
522 DataType::FixedSizeList(item_field, len)
523 }
524 Some(DataType::ListView(_)) => DataType::ListView(item_field),
525 Some(DataType::LargeListView(_)) => DataType::LargeListView(item_field),
526 _ => DataType::List(item_field),
527 };
528
529 Ok(Some(ParquetField {
530 rep_level,
531 def_level,
532 nullable,
533 arrow_type,
534 field_type: ParquetFieldType::Group {
535 children: vec![item],
536 },
537 }))
538 }
539 r => r,
540 }
541 }
542
543 fn dispatch(
544 &mut self,
545 cur_type: &TypePtr,
546 context: VisitorContext,
547 ) -> Result<Option<ParquetField>> {
548 if cur_type.is_primitive() {
549 self.visit_primitive(cur_type, context)
550 } else {
551 match cur_type.get_basic_info().converted_type() {
552 ConvertedType::LIST => self.visit_list(cur_type, context),
553 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
554 self.visit_map(cur_type, context)
555 }
556 _ => self.visit_struct(cur_type, context),
557 }
558 }
559 }
560}
561
562pub(super) fn convert_virtual_field(
572 arrow_field: &Field,
573 parent_rep_level: i16,
574 parent_def_level: i16,
575) -> Result<ParquetField> {
576 let nullable = arrow_field.is_nullable();
577 let def_level = if nullable {
578 parent_def_level + 1
579 } else {
580 parent_def_level
581 };
582
583 let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
585 ParquetError::ArrowError(format!(
586 "virtual column field '{}' must have an extension type",
587 arrow_field.name()
588 ))
589 })?;
590
591 let virtual_type = match extension_name {
592 RowNumber::NAME => VirtualColumnType::RowNumber,
593 RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
594 _ => {
595 return Err(ParquetError::ArrowError(format!(
596 "unsupported virtual column type '{}' for field '{}'",
597 extension_name,
598 arrow_field.name()
599 )));
600 }
601 };
602
603 Ok(ParquetField {
604 rep_level: parent_rep_level,
605 def_level,
606 nullable,
607 arrow_type: arrow_field.data_type().clone(),
608 field_type: ParquetFieldType::Virtual(virtual_type),
609 })
610}
611
612fn convert_field(
617 parquet_type: &Type,
618 field: &mut ParquetField,
619 arrow_hint: Option<&Field>,
620) -> Result<Field, ParquetError> {
621 let name = parquet_type.name();
622 let data_type = field.arrow_type.clone();
623 let nullable = field.nullable;
624
625 match arrow_hint {
626 Some(hint) => {
627 #[allow(deprecated)]
629 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
630 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
631 {
632 #[allow(deprecated)]
633 Field::new_dict(name, data_type, nullable, id, ordered)
634 }
635 _ => Field::new(name, data_type, nullable),
636 };
637
638 Ok(field.with_metadata(hint.metadata().clone()))
639 }
640 None => {
641 let mut ret = Field::new(name, data_type, nullable);
642 let basic_info = parquet_type.get_basic_info();
643 if basic_info.has_id() {
644 let mut meta = HashMap::with_capacity(1);
645 meta.insert(
646 PARQUET_FIELD_ID_META_KEY.to_string(),
647 basic_info.id().to_string(),
648 );
649 ret.set_metadata(meta);
650 }
651 try_add_extension_type(ret, parquet_type)
652 }
653 }
654}
655
656pub fn convert_schema(
662 schema: &SchemaDescriptor,
663 mask: ProjectionMask,
664 embedded_arrow_schema: Option<&Fields>,
665) -> Result<Option<ParquetField>> {
666 let mut visitor = Visitor {
667 next_col_idx: 0,
668 mask,
669 };
670
671 let context = VisitorContext {
672 rep_level: 0,
673 def_level: 0,
674 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
675 };
676
677 visitor.dispatch(&schema.root_schema_ptr(), context)
678}
679
680pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
682 let mut visitor = Visitor {
683 next_col_idx: 0,
684 mask: ProjectionMask::all(),
685 };
686
687 let context = VisitorContext {
688 rep_level: 0,
689 def_level: 0,
690 data_type: None,
691 };
692
693 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
694}